diff --git a/substrate/core/client/db/src/lib.rs b/substrate/core/client/db/src/lib.rs
index 8c2a5a76be84664610954b6898d4327c7cc97508..e5837e7f3f394f37bf9ea92fa9f36ea9cc848c75 100644
--- a/substrate/core/client/db/src/lib.rs
+++ b/substrate/core/client/db/src/lib.rs
@@ -363,7 +363,7 @@ pub struct DbChangesTrieStorage<Block: BlockT> {
 	_phantom: ::std::marker::PhantomData<Block>,
 }
 
-impl<Block: BlockT> state_machine::ChangesTrieStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
+impl<Block: BlockT> state_machine::ChangesTrieRootsStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
 	fn root(&self, block: u64) -> Result<Option<H256>, String> {
 		Ok(read_db::<Block>(&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Number(As::sa(block)))
 			.map_err(|err| format!("{}", err))
@@ -378,7 +378,9 @@ impl<Block: BlockT> state_machine::ChangesTrieStorage<Blake2Hasher> for DbChange
 				.and_then(DigestItem::as_changes_trie_root)
 				.map(|root| H256::from_slice(root.as_ref()))))
 	}
+}
 
+impl<Block: BlockT> state_machine::ChangesTrieStorage<Blake2Hasher> for DbChangesTrieStorage<Block> {
 	fn get(&self, key: &H256) -> Result<Option<DBValue>, String> {
 		self.db.get(columns::CHANGES_TRIE, &key[..])
 			.map_err(|err| format!("{}", err))
@@ -750,7 +752,7 @@ mod tests {
 	use client::backend::BlockImportOperation as Op;
 	use client::blockchain::HeaderBackend as BlockchainHeaderBackend;
 	use runtime_primitives::testing::{Header, Block as RawBlock};
-	use state_machine::{TrieMut, TrieDBMut, ChangesTrieStorage};
+	use state_machine::{TrieMut, TrieDBMut, ChangesTrieRootsStorage, ChangesTrieStorage};
 	use test_client;
 
 	type Block = RawBlock<u64>;
diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs
index d92f2487a647c9e836a248fbe02f23495c93769a..985439461c211bd8ef59eb05dbcb9ce47a170311 100644
--- a/substrate/core/client/src/client.rs
+++ b/substrate/core/client/src/client.rs
@@ -23,13 +23,14 @@ use primitives::AuthorityId;
 use runtime_primitives::{bft::Justification, generic::{BlockId, SignedBlock, Block as RuntimeBlock}};
 use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero, As, NumberFor, CurrentHeight, BlockNumberToHash};
 use runtime_primitives::BuildStorage;
-use primitives::{Blake2Hasher, H256};
+use primitives::{Blake2Hasher, H256, ChangesTrieConfiguration};
 use primitives::storage::{StorageKey, StorageData};
 use primitives::storage::well_known_keys;
 use codec::{Encode, Decode};
 use state_machine::{
 	Backend as StateBackend, CodeExecutor,
-	ExecutionStrategy, ExecutionManager, prove_read
+	ExecutionStrategy, ExecutionManager, prove_read,
+	key_changes, key_changes_proof,
 };
 
 use backend::{self, BlockImportOperation};
@@ -56,6 +57,7 @@ pub struct Client<B, E, Block> where Block: BlockT {
 	importing_block: RwLock<Option<Block::Hash>>, // holds the block hash currently being imported. TODO: replace this with block queue
 	block_execution_strategy: ExecutionStrategy,
 	api_execution_strategy: ExecutionStrategy,
+	changes_trie_config: Option<ChangesTrieConfiguration>,
 }
 
 /// A source of blockchain events.
@@ -239,6 +241,13 @@ impl<B, E, Block> Client<B, E, Block> where
 			)?;
 			backend.commit_operation(op)?;
 		}
+
+		// changes trie configuration should never change => we can read it in advance
+		let changes_trie_config = backend.state_at(BlockId::Number(Zero::zero()))?
+			.storage(well_known_keys::CHANGES_TRIE_CONFIG)
+			.map_err(|e| error::Error::from_state(Box::new(e)))?
+			.and_then(|c| Decode::decode(&mut &*c));
+
 		Ok(Client {
 			backend,
 			executor,
@@ -249,6 +258,7 @@ impl<B, E, Block> Client<B, E, Block> where
 			importing_block: Default::default(),
 			block_execution_strategy,
 			api_execution_strategy,
+			changes_trie_config,
 		})
 	}
 
@@ -335,6 +345,65 @@ impl<B, E, Block> Client<B, E, Block> where
 		Ok((header, proof))
 	}
 
+	/// Get pairs of (block, extrinsic) where key has been changed at given blocks range.
+	/// Works only for runtimes that are supporting changes tries.
+	pub fn key_changes(
+		&self,
+		first: Block::Hash,
+		last: Block::Hash,
+		key: &[u8]
+	) -> error::Result<Vec<(NumberFor<Block>, u32)>> {
+		let config = self.changes_trie_config.as_ref();
+		let storage = self.backend.changes_trie_storage();
+		let (config, storage) = match (config, storage) {
+			(Some(config), Some(storage)) => (config, storage),
+			_ => return Err(error::ErrorKind::ChangesTriesNotSupported.into()),
+		};
+
+		key_changes::<_, Blake2Hasher>(
+			config,
+			storage,
+			self.require_block_number_from_id(&BlockId::Hash(first))?.as_(),
+			self.require_block_number_from_id(&BlockId::Hash(last))?.as_(),
+			self.backend.blockchain().info()?.best_number.as_(),
+			key)
+		.map_err(|err| error::ErrorKind::ChangesTrieAccessFailed(err).into())
+		.map(|r| r.into_iter().map(|(b, e)| (As::sa(b), e)).collect())
+	}
+
+	/// Get proof for computation of (block, extrinsic) pairs where key has been changed at given blocks range.
+	/// `max` is the hash of the last block known to the requester - we can't use changes tries from descendants
+	/// of this block.
+	/// Works only for runtimes that are supporting changes tries.
+	pub fn key_changes_proof(
+		&self,
+		first: Block::Hash,
+		last: Block::Hash,
+		max: Block::Hash,
+		key: &[u8]
+	) -> error::Result<(NumberFor<Block>, Vec<Vec<u8>>)> {
+		let config = self.changes_trie_config.as_ref();
+		let storage = self.backend.changes_trie_storage();
+		let (config, storage) = match (config, storage) {
+			(Some(config), Some(storage)) => (config, storage),
+			_ => return Err(error::ErrorKind::ChangesTriesNotSupported.into()),
+		};
+
+		let max_number = ::std::cmp::min(
+			self.backend.blockchain().info()?.best_number,
+			self.require_block_number_from_id(&BlockId::Hash(max))?,
+		);
+		key_changes_proof::<_, Blake2Hasher>(
+			config,
+			storage,
+			self.require_block_number_from_id(&BlockId::Hash(first))?.as_(),
+			self.require_block_number_from_id(&BlockId::Hash(last))?.as_(),
+			max_number.as_(),
+			key)
+		.map_err(|err| error::ErrorKind::ChangesTrieAccessFailed(err).into())
+		.map(|proof| (max_number, proof))
+	}
+
 	/// Create a new block, built on the head of the chain.
 	pub fn new_block(&self) -> error::Result<block_builder::BlockBuilder<B, E, Block, Blake2Hasher>>
 	where E: Clone
@@ -710,13 +779,19 @@ impl<B, E, Block> Client<B, E, Block> where
 	}
 
 	/// Convert an arbitrary block ID into a block hash.
-	pub fn block_number_from_id(&self, id: &BlockId<Block>) -> error::Result<Option<<<Block as BlockT>::Header as HeaderT>::Number>> {
+	pub fn block_number_from_id(&self, id: &BlockId<Block>) -> error::Result<Option<NumberFor<Block>>> {
 		match *id {
 			BlockId::Hash(_) => Ok(self.header(id)?.map(|h| h.number().clone())),
 			BlockId::Number(n) => Ok(Some(n)),
 		}
 	}
 
+	/// Convert an arbitrary block ID into a block hash, returning error if the block is unknown.
+	fn require_block_number_from_id(&self, id: &BlockId<Block>) -> error::Result<NumberFor<Block>> {
+		self.block_number_from_id(id)
+			.and_then(|n| n.ok_or_else(|| error::ErrorKind::UnknownBlock(format!("{}", id)).into()))
+	}
+
 	/// Get block header by id.
 	pub fn header(&self, id: &BlockId<Block>) -> error::Result<Option<<Block as BlockT>::Header>> {
 		self.backend.blockchain().header(*id)
@@ -969,14 +1044,93 @@ impl<B, E, Block> BlockBody<Block> for Client<B, E, Block>
 }
 
 #[cfg(test)]
-mod tests {
+pub(crate) mod tests {
+	use std::collections::HashMap;
 	use super::*;
 	use keyring::Keyring;
+	use primitives::twox_128;
+	use runtime_primitives::traits::{Digest as DigestT, DigestItem as DigestItemT};
+	use runtime_primitives::generic::DigestItem;
 	use test_client::{self, TestClient};
 	use test_client::client::BlockOrigin;
 	use test_client::client::backend::Backend as TestBackend;
 	use test_client::BlockBuilderExt;
-	use test_client::runtime::Transfer;
+	use test_client::runtime::{self, Block, Transfer};
+
+	/// Returns tuple, consisting of:
+	/// 1) test client pre-filled with blocks changing balances;
+	/// 2) roots of changes tries for these blocks
+	/// 3) test cases in form (begin, end, key, vec![(block, extrinsic)]) that are required to pass
+	pub fn prepare_client_with_key_changes() -> (
+		test_client::client::Client<test_client::Backend, test_client::Executor, Block>,
+		Vec<H256>,
+		Vec<(u64, u64, Vec<u8>, Vec<(u64, u32)>)>,
+	) {
+		// prepare block structure
+		let blocks_transfers = vec![
+			vec![(Keyring::Alice, Keyring::Dave), (Keyring::Bob, Keyring::Dave)],
+			vec![(Keyring::Charlie, Keyring::Eve)],
+			vec![],
+			vec![(Keyring::Alice, Keyring::Dave)],
+		];
+
+		// prepare client ang import blocks
+		let mut local_roots = Vec::new();
+		let remote_client = test_client::new_with_changes_trie();
+		let mut nonces: HashMap<_, u64> = Default::default();
+		for (i, block_transfers) in blocks_transfers.into_iter().enumerate() {
+			let mut builder = remote_client.new_block().unwrap();
+			for (from, to) in block_transfers {
+				builder.push_transfer(Transfer {
+					from: from.to_raw_public().into(),
+					to: to.to_raw_public().into(),
+					amount: 1,
+					nonce: *nonces.entry(from).and_modify(|n| { *n = *n + 1 }).or_default(),
+				}).unwrap();
+			}
+			remote_client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap();
+
+			let header = remote_client.header(&BlockId::Number(i as u64 + 1)).unwrap().unwrap();
+			let trie_root = header.digest().logs().iter()
+				.find(|l| l.as_changes_trie_root().is_some())
+				.and_then(DigestItem::as_changes_trie_root)
+				.map(|root| H256::from_slice(root.as_ref()))
+				.unwrap();
+			local_roots.push(trie_root);
+		}
+
+		// prepare test cases
+		let alice = twox_128(&runtime::system::balance_of_key(Keyring::Alice.to_raw_public().into())).to_vec();
+		let bob = twox_128(&runtime::system::balance_of_key(Keyring::Bob.to_raw_public().into())).to_vec();
+		let charlie = twox_128(&runtime::system::balance_of_key(Keyring::Charlie.to_raw_public().into())).to_vec();
+		let dave = twox_128(&runtime::system::balance_of_key(Keyring::Dave.to_raw_public().into())).to_vec();
+		let eve = twox_128(&runtime::system::balance_of_key(Keyring::Eve.to_raw_public().into())).to_vec();
+		let ferdie = twox_128(&runtime::system::balance_of_key(Keyring::Ferdie.to_raw_public().into())).to_vec();
+		let test_cases = vec![
+			(1, 4, alice.clone(), vec![(4, 0), (1, 0)]),
+			(1, 3, alice.clone(), vec![(1, 0)]),
+			(2, 4, alice.clone(), vec![(4, 0)]),
+			(2, 3, alice.clone(), vec![]),
+
+			(1, 4, bob.clone(), vec![(1, 1)]),
+			(1, 1, bob.clone(), vec![(1, 1)]),
+			(2, 4, bob.clone(), vec![]),
+
+			(1, 4, charlie.clone(), vec![(2, 0)]),
+
+			(1, 4, dave.clone(), vec![(4, 0), (1, 1), (1, 0)]),
+			(1, 1, dave.clone(), vec![(1, 1), (1, 0)]),
+			(3, 4, dave.clone(), vec![(4, 0)]),
+
+			(1, 4, eve.clone(), vec![(2, 0)]),
+			(1, 1, eve.clone(), vec![]),
+			(3, 4, eve.clone(), vec![]),
+
+			(1, 4, ferdie.clone(), vec![]),
+		];
+
+		(remote_client, local_roots, test_cases)
+	}
 
 	#[test]
 	fn client_initialises_from_genesis_ok() {
@@ -1328,4 +1482,20 @@ mod tests {
 
 		assert_eq!(None, client.best_containing(d2.hash().clone(), Some(0)).unwrap());
 	}
+
+	#[test]
+	fn key_changes_works() {
+		let (client, _, test_cases) = prepare_client_with_key_changes();
+
+		for (index, (begin, end, key, expected_result)) in test_cases.into_iter().enumerate() {
+			let begin = client.block_hash(begin).unwrap().unwrap();
+			let end = client.block_hash(end).unwrap().unwrap();
+			let actual_result = client.key_changes(begin, end, &key).unwrap();
+			match actual_result == expected_result {
+				true => (),
+				false => panic!(format!("Failed test {}: actual = {:?}, expected = {:?}",
+					index, actual_result, expected_result)),
+			}
+		}
+	}
 }
diff --git a/substrate/core/client/src/error.rs b/substrate/core/client/src/error.rs
index bc73393371b1ea33e124d86819376fe59ca45e3e..88c7fe5ce66cbad0a5562dfd80078a8ceab349df 100644
--- a/substrate/core/client/src/error.rs
+++ b/substrate/core/client/src/error.rs
@@ -100,6 +100,18 @@ error_chain! {
 			display("Error decoding call result of {}", method)
 		}
 
+		/// Changes tries are not supported.
+		ChangesTriesNotSupported {
+			description("changes tries are not supported"),
+			display("Changes tries are not supported by the runtime"),
+		}
+
+		/// Key changes query has failed.
+		ChangesTrieAccessFailed(e: String) {
+			description("invalid changes proof"),
+			display("Failed to check changes proof: {}", e),
+		}
+
 		/// Last finalized block not parent of current.
 		NonSequentialFinalization(s: String) {
 			description("Did not finalize blocks in sequential order."),
diff --git a/substrate/core/client/src/light/fetcher.rs b/substrate/core/client/src/light/fetcher.rs
index 94a52208908bbf8782ac5923621158f00cfa9bde..3c997ad8a29af9c86d515b89537a6a2c7d60645b 100644
--- a/substrate/core/client/src/light/fetcher.rs
+++ b/substrate/core/client/src/light/fetcher.rs
@@ -16,13 +16,15 @@
 
 //! Light client data fetcher. Fetches requested data from remote full nodes.
 
+use std::marker::PhantomData;
 use futures::IntoFuture;
 
 use hash_db::Hasher;
 use heapsize::HeapSizeOf;
-use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
-use state_machine::{CodeExecutor, read_proof_check};
-use std::marker::PhantomData;
+use primitives::ChangesTrieConfiguration;
+use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor};
+use state_machine::{CodeExecutor, ChangesTrieRootsStorage, read_proof_check,
+	key_changes_proof_check};
 
 use call_executor::CallResult;
 use cht;
@@ -34,7 +36,7 @@ use light::call_executor::check_execution_proof;
 pub struct RemoteCallRequest<Header: HeaderT> {
 	/// Call at state of given block.
 	pub block: Header::Hash,
-	/// Head of block at which call is perormed.
+	/// Header of block at which call is performed.
 	pub header: Header,
 	/// Method to call.
 	pub method: String,
@@ -60,7 +62,7 @@ pub struct RemoteHeaderRequest<Header: HeaderT> {
 pub struct RemoteReadRequest<Header: HeaderT> {
 	/// Read at state of given block.
 	pub block: Header::Hash,
-	/// Head of block at which read is perormed.
+	/// Header of block at which read is performed.
 	pub header: Header,
 	/// Storage key to read.
 	pub key: Vec<u8>,
@@ -68,6 +70,28 @@ pub struct RemoteReadRequest<Header: HeaderT> {
 	pub retry_count: Option<usize>,
 }
 
+/// Remote key changes read request.
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct RemoteChangesRequest<Header: HeaderT> {
+	/// Changes trie configuration.
+	pub changes_trie_config: ChangesTrieConfiguration,
+	/// Query changes from range of blocks, starting (and including) with this hash...
+	pub first_block: (Header::Number, Header::Hash),
+	/// ...ending (and including) with this hash. Should come after first_block and
+	/// be the part of the same fork.
+	pub last_block: (Header::Number, Header::Hash),
+	/// Only use digests from blocks up to this hash. Should be last_block OR come
+	/// after this block and be the part of the same fork.
+	pub max_block: (Header::Number, Header::Hash),
+	// TODO: get rid of this + preserve change_trie_roots when replacing headers with CHT!!!
+	/// Changes trie roots for the range of blocks [first_block..max_block].
+	pub tries_roots: Vec<Header::Hash>,
+	/// Storage key to read.
+	pub key: Vec<u8>,
+	/// Number of times to retry request. None means that default RETRY_COUNT is used.
+	pub retry_count: Option<usize>,
+}
+
 /// Light client data fetcher. Implementations of this trait must check if remote data
 /// is correct (see FetchedDataChecker) and return already checked data.
 pub trait Fetcher<Block: BlockT>: Send + Sync {
@@ -77,6 +101,8 @@ pub trait Fetcher<Block: BlockT>: Send + Sync {
 	type RemoteReadResult: IntoFuture<Item=Option<Vec<u8>>, Error=ClientError>;
 	/// Remote call result future.
 	type RemoteCallResult: IntoFuture<Item=CallResult, Error=ClientError>;
+	/// Remote changes result future.
+	type RemoteChangesResult: IntoFuture<Item=Vec<(NumberFor<Block>, u32)>, Error=ClientError>;
 
 	/// Fetch remote header.
 	fn remote_header(&self, request: RemoteHeaderRequest<Block::Header>) -> Self::RemoteHeaderResult;
@@ -84,6 +110,9 @@ pub trait Fetcher<Block: BlockT>: Send + Sync {
 	fn remote_read(&self, request: RemoteReadRequest<Block::Header>) -> Self::RemoteReadResult;
 	/// Fetch remote call result.
 	fn remote_call(&self, request: RemoteCallRequest<Block::Header>) -> Self::RemoteCallResult;
+	/// Fetch remote changes ((block number, extrinsic index)) where given key has been changed
+	/// at a given blocks range.
+	fn remote_changes(&self, request: RemoteChangesRequest<Block::Header>) -> Self::RemoteChangesResult;
 }
 
 /// Light client remote data checker.
@@ -110,6 +139,13 @@ pub trait FetchChecker<Block: BlockT>: Send + Sync {
 		request: &RemoteCallRequest<Block::Header>,
 		remote_proof: Vec<Vec<u8>>
 	) -> ClientResult<CallResult>;
+	/// Check remote changes query proof.
+	fn check_changes_proof(
+		&self,
+		request: &RemoteChangesRequest<Block::Header>,
+		remote_max: NumberFor<Block>,
+		remote_proof: Vec<Vec<u8>>
+	) -> ClientResult<Vec<(NumberFor<Block>, u32)>>;
 }
 
 /// Remote data checker.
@@ -168,6 +204,60 @@ impl<E, Block, H> FetchChecker<Block> for LightDataChecker<E, H>
 	) -> ClientResult<CallResult> {
 		check_execution_proof::<_, _, H>(&self.executor, request, remote_proof)
 	}
+
+	fn check_changes_proof(
+		&self,
+		request: &RemoteChangesRequest<Block::Header>,
+		remote_max: NumberFor<Block>,
+		remote_proof: Vec<Vec<u8>>
+	) -> ClientResult<Vec<(NumberFor<Block>, u32)>> {
+		// since we need roots of all changes tries for the range begin..max
+		// => remote node can't use max block greater that one that we have passed
+		if remote_max > request.max_block.0 || remote_max < request.last_block.0 {
+			return Err(ClientErrorKind::ChangesTrieAccessFailed(format!(
+				"Invalid max_block used by the remote node: {}. Local: {}..{}..{}",
+				remote_max, request.first_block.0, request.last_block.0, request.max_block.0,
+			)).into());
+		}
+
+		let first_number = request.first_block.0.as_();
+		key_changes_proof_check::<_, H>(
+			&request.changes_trie_config,
+			&RootsStorage {
+				first: first_number,
+				roots: &request.tries_roots,
+			},
+			remote_proof,
+			first_number,
+			request.last_block.0.as_(),
+			remote_max.as_(),
+			&request.key)
+		.map(|pairs| pairs.into_iter().map(|(b, x)| (As::sa(b), x)).collect())
+		.map_err(|err| ClientErrorKind::ChangesTrieAccessFailed(err).into())
+	}
+}
+
+/// A view of HashMap<Number, Hash> as a changes trie roots storage.
+struct RootsStorage<'a, Hash: 'a> {
+	first: u64,
+	roots: &'a [Hash],
+}
+
+impl<'a, H, Hash> ChangesTrieRootsStorage<H> for RootsStorage<'a, Hash>
+	where
+		H: Hasher,
+		Hash: 'a + Send + Sync + Clone + AsRef<[u8]>,
+{
+	fn root(&self, block: u64) -> Result<Option<H::Out>, String> {
+		Ok(block.checked_sub(self.first)
+			.and_then(|index| self.roots.get(index as usize))
+			.cloned()
+			.map(|root| {
+				let mut hasher_root: H::Out = Default::default();
+				hasher_root.as_mut().copy_from_slice(root.as_ref());
+				hasher_root
+			}))
+	}
 }
 
 #[cfg(test)]
@@ -175,9 +265,11 @@ pub mod tests {
 	use futures::future::{ok, err, FutureResult};
 	use parking_lot::Mutex;
 	use call_executor::CallResult;
+	use client::tests::prepare_client_with_key_changes;
 	use executor::{self, NativeExecutionDispatch};
 	use error::Error as ClientError;
-	use test_client::{self, TestClient, runtime::{Hash, Block, Header}};
+	use test_client::{self, TestClient};
+	use test_client::runtime::{self, Hash, Block, Header};
 	use test_client::client::BlockOrigin;
 	use in_mem::{Blockchain as InMemoryBlockchain};
 	use light::fetcher::{Fetcher, FetchChecker, LightDataChecker,
@@ -194,6 +286,7 @@ pub mod tests {
 		type RemoteHeaderResult = FutureResult<Header, ClientError>;
 		type RemoteReadResult = FutureResult<Option<Vec<u8>>, ClientError>;
 		type RemoteCallResult = FutureResult<CallResult, ClientError>;
+		type RemoteChangesResult = FutureResult<Vec<(NumberFor<Block>, u32)>, ClientError>;
 
 		fn remote_header(&self, _request: RemoteHeaderRequest<Header>) -> Self::RemoteHeaderResult {
 			err("Not implemented on test node".into())
@@ -206,6 +299,10 @@ pub mod tests {
 		fn remote_call(&self, _request: RemoteCallRequest<Header>) -> Self::RemoteCallResult {
 			ok((*self.lock()).clone())
 		}
+
+		fn remote_changes(&self, _request: RemoteChangesRequest<Header>) -> Self::RemoteChangesResult {
+			err("Not implemented on test node".into())
+		}
 	}
 
 	fn prepare_for_read_proof_check() -> (
@@ -231,7 +328,7 @@ pub mod tests {
 			None,
 			None,
 			::backend::NewBlockState::Final,
-		);
+		).unwrap();
 		let local_executor = test_client::LocalExecutor::new();
 		let local_checker = LightDataChecker::new(local_executor);
 		(local_checker, remote_block_header, remote_read_proof, authorities_len)
@@ -307,4 +404,80 @@ pub mod tests {
 			retry_count: None,
 		}, Some(remote_block_header.clone()), remote_header_proof).is_err());
 	}
+
+	#[test]
+	fn changes_proof_is_generated_and_checked() {
+		let (remote_client, local_roots, test_cases) = prepare_client_with_key_changes();
+		let local_checker = LightDataChecker::<_, Blake2Hasher>::new(
+			test_client::LocalExecutor::new());
+		let local_checker = &local_checker as &FetchChecker<Block>;
+		let max = remote_client.info().unwrap().chain.best_number;
+		let max_hash = remote_client.info().unwrap().chain.best_hash;
+
+		for (index, (begin, end, key, expected_result)) in test_cases.into_iter().enumerate() {
+			let begin_hash = remote_client.block_hash(begin).unwrap().unwrap();
+			let end_hash = remote_client.block_hash(end).unwrap().unwrap();
+
+			// 'fetch' changes proof from remote node
+			let (remote_max, remote_proof) = remote_client.key_changes_proof(
+				begin_hash, end_hash, max_hash, &key
+			).unwrap();
+
+			// check proof on local client
+			let local_roots_range = local_roots.clone()[(begin - 1) as usize..].to_vec();
+			let request = RemoteChangesRequest::<Header> {
+				changes_trie_config: runtime::changes_trie_config(),
+				first_block: (begin, begin_hash),
+				last_block: (end, end_hash),
+				max_block: (max, max_hash),
+				tries_roots: local_roots_range,
+				key: key,
+				retry_count: None,
+			};
+			let local_result = local_checker.check_changes_proof(
+				&request, remote_max, remote_proof).unwrap();
+
+			// ..and ensure that result is the same as on remote node
+			match local_result == expected_result {
+				true => (),
+				false => panic!(format!("Failed test {}: local = {:?}, expected = {:?}",
+					index, local_result, expected_result)),
+			}
+		}
+	}
+
+	#[test]
+	fn check_changes_proof_fails_if_proof_is_wrong() {
+		let (remote_client, local_roots, test_cases) = prepare_client_with_key_changes();
+		let local_checker = LightDataChecker::<_, Blake2Hasher>::new(
+			test_client::LocalExecutor::new());
+		let local_checker = &local_checker as &FetchChecker<Block>;
+		let max = remote_client.info().unwrap().chain.best_number;
+		let max_hash = remote_client.info().unwrap().chain.best_hash;
+
+		let (begin, end, key, _) = test_cases[0].clone();
+		let begin_hash = remote_client.block_hash(begin).unwrap().unwrap();
+		let end_hash = remote_client.block_hash(end).unwrap().unwrap();
+
+		// 'fetch' changes proof from remote node
+		let (remote_max, mut remote_proof) = remote_client.key_changes_proof(
+			begin_hash, end_hash, max_hash, &key).unwrap();
+		let local_roots_range = local_roots.clone()[(begin - 1) as usize..].to_vec();
+		let request = RemoteChangesRequest::<Header> {
+			changes_trie_config: runtime::changes_trie_config(),
+			first_block: (begin, begin_hash),
+			last_block: (end, end_hash),
+			max_block: (max, max_hash),
+			tries_roots: local_roots_range.clone(),
+			key: key,
+			retry_count: None,
+		};
+
+		// check proof on local client using max from the future
+		assert!(local_checker.check_changes_proof(&request, remote_max + 1, remote_proof.clone()).is_err());
+
+		// check proof on local client using broken proof
+		remote_proof = local_roots_range.into_iter().map(|v| v.to_vec()).collect();
+		assert!(local_checker.check_changes_proof(&request, remote_max, remote_proof).is_err());
+	}
 }
diff --git a/substrate/core/network/src/chain.rs b/substrate/core/network/src/chain.rs
index b50207cd87402c7e6dd88d75c28924a6e5461f20..fb3a16d3679d9034de9879e304dcc83ba3c9d2b4 100644
--- a/substrate/core/network/src/chain.rs
+++ b/substrate/core/network/src/chain.rs
@@ -18,7 +18,7 @@
 
 use client::{self, Client as SubstrateClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin, CallExecutor};
 use client::error::Error;
-use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
+use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
 use runtime_primitives::generic::BlockId;
 use runtime_primitives::bft::Justification;
 use primitives::{Blake2Hasher};
@@ -61,6 +61,15 @@ pub trait Client<Block: BlockT>: Send + Sync {
 
 	/// Get method execution proof.
 	fn execution_proof(&self, block: &Block::Hash, method: &str, data: &[u8]) -> Result<(Vec<u8>, Vec<Vec<u8>>), Error>;
+
+	/// Get key changes proof.
+	fn key_changes_proof(
+		&self,
+		first: Block::Hash,
+		last: Block::Hash,
+		max: Block::Hash,
+		key: &[u8]
+	) -> Result<(NumberFor<Block>, Vec<Vec<u8>>), Error>;
 }
 
 impl<B, E, Block> Client<Block> for SubstrateClient<B, E, Block> where
@@ -116,4 +125,14 @@ impl<B, E, Block> Client<Block> for SubstrateClient<B, E, Block> where
 	fn execution_proof(&self, block: &Block::Hash, method: &str, data: &[u8]) -> Result<(Vec<u8>, Vec<Vec<u8>>), Error> {
 		(self as &SubstrateClient<B, E, Block>).execution_proof(&BlockId::Hash(block.clone()), method, data)
 	}
+
+	fn key_changes_proof(
+		&self,
+		first: Block::Hash,
+		last: Block::Hash,
+		max: Block::Hash,
+		key: &[u8]
+	) -> Result<(NumberFor<Block>, Vec<Vec<u8>>), Error> {
+		(self as &SubstrateClient<B, E, Block>).key_changes_proof(first, last, max, key)
+	}
 }
diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/message.rs
index 3bb080e13354d302be49233c92146e53a7a9bb1e..58060e93b0a4fbb34e85e0d0c8bee2680d386847 100644
--- a/substrate/core/network/src/message.rs
+++ b/substrate/core/network/src/message.rs
@@ -20,8 +20,9 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
 use codec::{Encode, Decode, Input, Output};
 pub use self::generic::{
 	BlockAnnounce, RemoteCallRequest, RemoteReadRequest,
-	RemoteHeaderRequest, RemoteHeaderResponse, ConsensusVote,
-	SignedConsensusVote, FromBlock
+	RemoteHeaderRequest, RemoteHeaderResponse,
+	RemoteChangesRequest, RemoteChangesResponse,
+	ConsensusVote, SignedConsensusVote, FromBlock
 };
 
 /// A unique ID of a request.
@@ -274,6 +275,10 @@ pub mod generic {
 		RemoteHeaderRequest(RemoteHeaderRequest<Number>),
 		/// Remote header response.
 		RemoteHeaderResponse(RemoteHeaderResponse<Header>),
+		/// Remote changes request.
+		RemoteChangesRequest(RemoteChangesRequest<Hash>),
+		/// Remote changes reponse.
+		RemoteChangesResponse(RemoteChangesResponse<Number>),
 		/// Chain-specific message
 		#[codec(index = "255")]
 		ChainSpecific(Vec<u8>),
@@ -372,4 +377,31 @@ pub mod generic {
 		/// Header proof.
 		pub proof: Vec<Vec<u8>>,
 	}
+
+	#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
+	/// Remote changes request.
+	pub struct RemoteChangesRequest<H> {
+		/// Unique request id.
+		pub id: RequestId,
+		/// Hash of the first block of the range (including first) where changes are requested.
+		pub first: H,
+		/// Hash of the last block of the range (including last) where changes are requested.
+		pub last: H,
+		/// Hash of the last block that we can use when querying changes.
+		pub max: H,
+		/// Storage key which changes are requested.
+		pub key: Vec<u8>,
+	}
+
+	#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
+	/// Remote changes response.
+	pub struct RemoteChangesResponse<N> {
+		/// Id of a request this response was made for.
+		pub id: RequestId,
+		/// Proof has been generated using block with this number as a max block. Should be
+		/// less than or equal to the RemoteChangesRequest::max block number.
+		pub max: N,
+		/// Changes proof.
+		pub proof: Vec<Vec<u8>>,
+	}
 }
diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs
index 2b3b020814fab12517a981ec173e5155d07275ed..7fc709e4037971ae1e842983f8d197ef6ce83f87 100644
--- a/substrate/core/network/src/on_demand.rs
+++ b/substrate/core/network/src/on_demand.rs
@@ -24,14 +24,14 @@ use futures::sync::oneshot::{channel, Receiver, Sender};
 use linked_hash_map::LinkedHashMap;
 use linked_hash_map::Entry;
 use parking_lot::Mutex;
-use client;
+use client::{self, error::{Error as ClientError, ErrorKind as ClientErrorKind}};
 use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
-	RemoteCallRequest, RemoteReadRequest};
+	RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest};
 use io::SyncIo;
 use message;
 use network_libp2p::{Severity, NodeIndex};
 use service;
-use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
+use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
 
 /// Remote request timeout.
 const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
@@ -62,6 +62,14 @@ pub trait OnDemandService<Block: BlockT>: Send + Sync {
 
 	/// When call response is received from remote node.
 	fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse);
+
+	/// When changes response is received from remote node.
+	fn on_remote_changes_response(
+		&self,
+		io: &mut SyncIo,
+		peer: NodeIndex,
+		response: message::RemoteChangesResponse<NumberFor<Block>>
+	);
 }
 
 /// On-demand requests service. Dispatches requests to appropriate peers.
@@ -72,7 +80,7 @@ pub struct OnDemand<B: BlockT, E: service::ExecuteInContext<B>> {
 
 /// On-demand remote call response.
 pub struct RemoteResponse<T> {
-	receiver: Receiver<Result<T, client::error::Error>>,
+	receiver: Receiver<Result<T, ClientError>>,
 }
 
 #[derive(Default)]
@@ -92,24 +100,25 @@ struct Request<Block: BlockT> {
 }
 
 enum RequestData<Block: BlockT> {
-	RemoteHeader(RemoteHeaderRequest<Block::Header>, Sender<Result<Block::Header, client::error::Error>>),
-	RemoteRead(RemoteReadRequest<Block::Header>, Sender<Result<Option<Vec<u8>>, client::error::Error>>),
-	RemoteCall(RemoteCallRequest<Block::Header>, Sender<Result<client::CallResult, client::error::Error>>),
+	RemoteHeader(RemoteHeaderRequest<Block::Header>, Sender<Result<Block::Header, ClientError>>),
+	RemoteRead(RemoteReadRequest<Block::Header>, Sender<Result<Option<Vec<u8>>, ClientError>>),
+	RemoteCall(RemoteCallRequest<Block::Header>, Sender<Result<client::CallResult, ClientError>>),
+	RemoteChanges(RemoteChangesRequest<Block::Header>, Sender<Result<Vec<(NumberFor<Block>, u32)>, ClientError>>),
 }
 
 enum Accept<Block: BlockT> {
 	Ok,
-	CheckFailed(client::error::Error, RequestData<Block>),
+	CheckFailed(ClientError, RequestData<Block>),
 	Unexpected(RequestData<Block>),
 }
 
 impl<T> Future for RemoteResponse<T> {
 	type Item = T;
-	type Error = client::error::Error;
+	type Error = ClientError;
 
 	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 		self.receiver.poll()
-			.map_err(|_| client::error::ErrorKind::RemoteFetchCancelled.into())
+			.map_err(|_| ClientErrorKind::RemoteFetchCancelled.into())
 			.and_then(|r| match r {
 				Async::Ready(Ok(ready)) => Ok(Async::Ready(ready)),
 				Async::Ready(Err(error)) => Err(error),
@@ -172,7 +181,7 @@ impl<B: BlockT, E> OnDemand<B, E> where
 					(retry_count - 1, Some(retry_request_data))
 				} else {
 					trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype);
-					retry_request_data.fail(client::error::ErrorKind::RemoteFetchFailed.into());
+					retry_request_data.fail(ClientErrorKind::RemoteFetchFailed.into());
 					(0, None)
 				}
 			},
@@ -262,6 +271,22 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where
 			data @ _ => Accept::Unexpected(data),
 		})
 	}
+
+	fn on_remote_changes_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteChangesResponse<NumberFor<B>>) {
+		self.accept_response("changes", io, peer, response.id, |request| match request.data {
+			RequestData::RemoteChanges(request, sender) => match self.checker.check_changes_proof(
+				&request, response.max, response.proof
+			) {
+				Ok(response) => {
+					// we do not bother if receiver has been dropped already
+					let _ = sender.send(Ok(response));
+					Accept::Ok
+				},
+				Err(error) => Accept::CheckFailed(error, RequestData::RemoteChanges(request, sender)),
+			},
+			data @ _ => Accept::Unexpected(data),
+		})
+	}
 }
 
 impl<B, E> Fetcher<B> for OnDemand<B, E> where
@@ -272,6 +297,7 @@ impl<B, E> Fetcher<B> for OnDemand<B, E> where
 	type RemoteHeaderResult = RemoteResponse<B::Header>;
 	type RemoteReadResult = RemoteResponse<Option<Vec<u8>>>;
 	type RemoteCallResult = RemoteResponse<client::CallResult>;
+	type RemoteChangesResult = RemoteResponse<Vec<(NumberFor<B>, u32)>>;
 
 	fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
 		let (sender, receiver) = channel();
@@ -290,6 +316,12 @@ impl<B, E> Fetcher<B> for OnDemand<B, E> where
 		self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender),
 			RemoteResponse { receiver })
 	}
+
+	fn remote_changes(&self, request: RemoteChangesRequest<B::Header>) -> Self::RemoteChangesResult {
+		let (sender, receiver) = channel();
+		self.schedule_request(request.retry_count.clone(), RequestData::RemoteChanges(request, sender),
+			RemoteResponse { receiver })
+	}
 }
 
 impl<B, E> OnDemandCore<B, E> where
@@ -377,35 +409,44 @@ impl<B, E> OnDemandCore<B, E> where
 impl<Block: BlockT> Request<Block> {
 	pub fn message(&self) -> message::Message<Block> {
 		match self.data {
-			RequestData::RemoteHeader(ref data, _) => message::generic::Message::RemoteHeaderRequest(
-				message::RemoteHeaderRequest {
+			RequestData::RemoteHeader(ref data, _) =>
+				message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest {
 					id: self.id,
 					block: data.block,
 				}),
-			RequestData::RemoteRead(ref data, _) => message::generic::Message::RemoteReadRequest(
-				message::RemoteReadRequest {
+			RequestData::RemoteRead(ref data, _) =>
+				message::generic::Message::RemoteReadRequest(message::RemoteReadRequest {
 					id: self.id,
 					block: data.block,
 					key: data.key.clone(),
 				}),
-			RequestData::RemoteCall(ref data, _) => message::generic::Message::RemoteCallRequest(
-				message::RemoteCallRequest {
+			RequestData::RemoteCall(ref data, _) =>
+				message::generic::Message::RemoteCallRequest(message::RemoteCallRequest {
 					id: self.id,
 					block: data.block,
 					method: data.method.clone(),
 					data: data.call_data.clone(),
 				}),
+			RequestData::RemoteChanges(ref data, _) =>
+				message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest {
+					id: self.id,
+					first: data.first_block.1.clone(),
+					last: data.last_block.1.clone(),
+					max: data.max_block.1.clone(),
+					key: data.key.clone(),
+				}),
 		}
 	}
 }
 
 impl<Block: BlockT> RequestData<Block> {
-	pub fn fail(self, error: client::error::Error) {
+	pub fn fail(self, error: ClientError) {
 		// don't care if anyone is listening
 		match self {
 			RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); },
 			RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); },
 			RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); },
+			RequestData::RemoteChanges(_, sender) => { let _ = sender.send(Err(error)); },
 		}
 	}
 }
@@ -417,15 +458,15 @@ pub mod tests {
 	use std::time::Instant;
 	use futures::Future;
 	use parking_lot::RwLock;
-	use client;
+	use client::{self, error::{ErrorKind as ClientErrorKind, Result as ClientResult}};
 	use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
-		RemoteCallRequest, RemoteReadRequest};
+		RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest};
 	use message;
 	use network_libp2p::NodeIndex;
 	use service::{Roles, ExecuteInContext};
 	use test::TestIo;
 	use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService};
-	use test_client::runtime::{Block, Header};
+	use test_client::runtime::{changes_trie_config, Block, Header};
 
 	pub struct DummyExecutor;
 	struct DummyFetchChecker { ok: bool }
@@ -440,27 +481,34 @@ pub mod tests {
 			_request: &RemoteHeaderRequest<Header>,
 			header: Option<Header>,
 			_remote_proof: Vec<Vec<u8>>
-		) -> client::error::Result<Header> {
+		) -> ClientResult<Header> {
 			match self.ok {
 				true if header.is_some() => Ok(header.unwrap()),
-				_ => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
+				_ => Err(ClientErrorKind::Backend("Test error".into()).into()),
 			}
 		}
 
-		fn check_read_proof(&self, _request: &RemoteReadRequest<Header>, _remote_proof: Vec<Vec<u8>>) -> client::error::Result<Option<Vec<u8>>> {
+		fn check_read_proof(&self, _: &RemoteReadRequest<Header>, _: Vec<Vec<u8>>) -> ClientResult<Option<Vec<u8>>> {
 			match self.ok {
 				true => Ok(Some(vec![42])),
-				false => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
+				false => Err(ClientErrorKind::Backend("Test error".into()).into()),
 			}
 		}
 
-		fn check_execution_proof(&self, _request: &RemoteCallRequest<Header>, _remote_proof: Vec<Vec<u8>>) -> client::error::Result<client::CallResult> {
+		fn check_execution_proof(&self, _: &RemoteCallRequest<Header>, _: Vec<Vec<u8>>) -> ClientResult<client::CallResult> {
 			match self.ok {
 				true => Ok(client::CallResult {
 					return_data: vec![42],
 					changes: Default::default(),
 				}),
-				false => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
+				false => Err(ClientErrorKind::Backend("Test error".into()).into()),
+			}
+		}
+
+		fn check_changes_proof(&self, _: &RemoteChangesRequest<Header>, _: u64, _: Vec<Vec<u8>>) -> ClientResult<Vec<(u64, u32)>> {
+			match self.ok {
+				true => Ok(vec![(100, 2)]),
+				false => Err(ClientErrorKind::Backend("Test error".into()).into()),
 			}
 		}
 	}
@@ -733,4 +781,33 @@ pub mod tests {
 		});
 		thread.join().unwrap();
 	}
+
+	#[test]
+	fn receives_remote_changes_response() {
+		let (_x, on_demand) = dummy(true);
+		let queue = RwLock::new(VecDeque::new());
+		let mut network = TestIo::new(&queue, None);
+		on_demand.on_connect(0, Roles::FULL);
+
+		let response = on_demand.remote_changes(RemoteChangesRequest {
+			changes_trie_config: changes_trie_config(),
+			first_block: (1, Default::default()),
+			last_block: (100, Default::default()),
+			max_block: (100, Default::default()),
+			tries_roots: vec![],
+			key: vec![],
+			retry_count: None,
+		});
+		let thread = ::std::thread::spawn(move || {
+			let result = response.wait().unwrap();
+			assert_eq!(result, vec![(100, 2)]);
+		});
+
+		on_demand.on_remote_changes_response(&mut network, 0, message::RemoteChangesResponse {
+			id: 0,
+			max: 1000,
+			proof: vec![vec![2]],
+		});
+		thread.join().unwrap();
+	}
 }
diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs
index 8866a167a28557bd0321ba23db60c0f73ca9e287..6c0e887b66107e4cfe093c025a14cc8cf30948e3 100644
--- a/substrate/core/network/src/protocol.rs
+++ b/substrate/core/network/src/protocol.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
 use std::time;
 use parking_lot::RwLock;
 use rustc_hex::ToHex;
-use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, NumberFor, As};
+use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, NumberFor, As, Zero};
 use runtime_primitives::generic::BlockId;
 use network_libp2p::{NodeIndex, Severity};
 use codec::{Encode, Decode};
@@ -273,6 +273,8 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
 			GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(io, who, response),
 			GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(io, who, request),
 			GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response),
+			GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(io, who, request),
+			GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(io, who, response),
 			other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, other),
 		}
 	}
@@ -648,6 +650,29 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
 		self.on_demand.as_ref().map(|s| s.on_remote_header_response(io, who, response));
 	}
 
+	fn on_remote_changes_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteChangesRequest<B::Hash>) {
+		trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})",
+			request.id, who, request.key.to_hex(), request.first, request.last);
+		let (max, proof) = match self.context_data.chain.key_changes_proof(request.first, request.last, request.max, &request.key) {
+			Ok((max, proof)) => (max, proof),
+			Err(error) => {
+				trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{}) failed with: {}",
+					request.id, who, request.key.to_hex(), request.first, request.last, error);
+				(Zero::zero(), Default::default())
+			},
+		};
+ 		self.send_message(io, who, GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse {
+			id: request.id, max, proof,
+		}));
+	}
+
+ 	fn on_remote_changes_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteChangesResponse<NumberFor<B>>) {
+		trace!(target: "sync", "Remote changes proof response {} from {} (max={})",
+			response.id, who, response.max);
+		self.on_demand.as_ref().map(|s| s.on_remote_changes_response(io, who, response));
+	}
+
+
 	/// Execute a closure with access to a network context and specialization.
 	pub fn with_spec<F, U>(&self, io: &mut SyncIo, f: F) -> U
 		where F: FnOnce(&mut S, &mut Context<B>) -> U
diff --git a/substrate/core/state-machine/src/changes_trie/changes_iterator.rs b/substrate/core/state-machine/src/changes_trie/changes_iterator.rs
index 886cd6abb14cf8e8039f7ef0bac24cc31b0a0afa..2f4ef371488c9b54bdcc93a358acdc76b31255f1 100644
--- a/substrate/core/state-machine/src/changes_trie/changes_iterator.rs
+++ b/substrate/core/state-machine/src/changes_trie/changes_iterator.rs
@@ -23,7 +23,7 @@ use codec::{Decode, Encode};
 use hash_db::{HashDB, Hasher};
 use heapsize::HeapSizeOf;
 use substrate_trie::{Recorder, MemoryDB};
-use changes_trie::{Configuration, Storage};
+use changes_trie::{Configuration, RootsStorage, Storage};
 use changes_trie::input::{DigestIndex, ExtrinsicIndex, DigestIndexValue, ExtrinsicIndexValue};
 use changes_trie::storage::{TrieBackendAdapter, InMemoryStorage};
 use proving_backend::ProvingBackendEssence;
@@ -44,6 +44,8 @@ pub fn key_changes<S: Storage<H>, H: Hasher>(
 			key,
 			roots_storage: storage,
 			storage,
+			begin,
+			end,
 			surface: surface_iterator(config, max, begin, end)?,
 
 			extrinsics: Default::default(),
@@ -69,6 +71,8 @@ pub fn key_changes_proof<S: Storage<H>, H: Hasher>(
 			key,
 			roots_storage: storage.clone(),
 			storage,
+			begin,
+			end,
 			surface: surface_iterator(config, max, begin, end)?,
 
 			extrinsics: Default::default(),
@@ -89,9 +93,9 @@ pub fn key_changes_proof<S: Storage<H>, H: Hasher>(
 
 /// Check key changes proog and return changes of the key at given blocks range.
 /// `max` is the number of best known block.
-pub fn key_changes_proof_check<S: Storage<H>, H: Hasher>(
+pub fn key_changes_proof_check<S: RootsStorage<H>, H: Hasher>(
 	config: &Configuration,
-	roots_storage: &S, // TODO: use RootsStorage is only used to gather root
+	roots_storage: &S,
 	proof: Vec<Vec<u8>>,
 	begin: u64,
 	end: u64,
@@ -109,6 +113,8 @@ pub fn key_changes_proof_check<S: Storage<H>, H: Hasher>(
 			key,
 			roots_storage,
 			storage: &proof_db,
+			begin,
+			end,
 			surface: surface_iterator(config, max, begin, end)?,
 
 			extrinsics: Default::default(),
@@ -168,10 +174,12 @@ impl<'a> Iterator for SurfaceIterator<'a> {
 
 /// Drilldown iterator - receives 'digest points' from surface iterator and explores
 /// every point until extrinsic is found.
-pub struct DrilldownIteratorEssence<'a, RS: 'a + Storage<H>, S: 'a + Storage<H>, H: Hasher> {
+pub struct DrilldownIteratorEssence<'a, RS: 'a + RootsStorage<H>, S: 'a + Storage<H>, H: Hasher> {
 	key: &'a [u8],
 	roots_storage: &'a RS,
 	storage: &'a S,
+	begin: u64,
+	end: u64,
 	surface: SurfaceIterator<'a>,
 
 	extrinsics: VecDeque<(u64, u32)>,
@@ -180,7 +188,7 @@ pub struct DrilldownIteratorEssence<'a, RS: 'a + Storage<H>, S: 'a + Storage<H>,
 	_hasher: ::std::marker::PhantomData<H>,
 }
 
-impl<'a, RS: 'a + Storage<H>, S: Storage<H>, H: Hasher> DrilldownIteratorEssence<'a, RS, S, H> {
+impl<'a, RS: 'a + RootsStorage<H>, S: Storage<H>, H: Hasher> DrilldownIteratorEssence<'a, RS, S, H> {
 	pub fn next<F>(&mut self, trie_reader: F) -> Option<Result<(u64, u32), String>>
 		where
 			F: FnMut(&S, H::Out, &[u8]) -> Result<Option<Vec<u8>>, String>,
@@ -202,7 +210,17 @@ impl<'a, RS: 'a + Storage<H>, S: Storage<H>, H: Hasher> DrilldownIteratorEssence
 			}
 
 			if let Some((block, level)) = self.blocks.pop_front() {
-				if let Some(trie_root) = self.roots_storage.root(block)? {
+				// not having a changes trie root is an error because:
+				// we never query roots for future blocks
+				// AND trie roots for old blocks are known (both on full + light node)
+				let trie_root = self.roots_storage.root(block)?
+					.ok_or_else(|| format!("Changes trie root for block {} is not found", block))?;
+
+				// only return extrinsics for blocks before self.max
+				// most of blocks will be filtered out beore pushing to `self.blocks`
+				// here we just throwing away changes at digest blocks we're processing
+				debug_assert!(block >= self.begin, "We shall not touch digests earlier than a range' begin");
+				if block <= self.end {
 					let extrinsics_key = ExtrinsicIndex { block, key: self.key.to_vec() }.encode();
 					let extrinsics = trie_reader(&self.storage, trie_root, &extrinsics_key);
 					if let Some(extrinsics) = extrinsics? {
@@ -211,14 +229,22 @@ impl<'a, RS: 'a + Storage<H>, S: Storage<H>, H: Hasher> DrilldownIteratorEssence
 							self.extrinsics.extend(extrinsics.into_iter().rev().map(|e| (block, e)));
 						}
 					}
+				}
 
-					let blocks_key = DigestIndex { block, key: self.key.to_vec() }.encode();
-					let blocks = trie_reader(&self.storage, trie_root, &blocks_key);
-					if let Some(blocks) = blocks? {
-						let blocks: Option<DigestIndexValue> = Decode::decode(&mut &blocks[..]);
-						if let Some(blocks) = blocks {
-							self.blocks.extend(blocks.into_iter().rev().map(|b| (b, level - 1)));
-						}
+				let blocks_key = DigestIndex { block, key: self.key.to_vec() }.encode();
+				let blocks = trie_reader(&self.storage, trie_root, &blocks_key);
+				if let Some(blocks) = blocks? {
+					let blocks: Option<DigestIndexValue> = Decode::decode(&mut &blocks[..]);
+					if let Some(blocks) = blocks {
+						// filter level0 blocks here because we tend to use digest blocks,
+						// AND digest block changes could also include changes for out-of-range blocks
+						let begin = self.begin;
+						let end = self.end;
+						self.blocks.extend(blocks.into_iter()
+							.rev()
+							.filter(|b| level > 1 || (*b >= begin && *b <= end))
+							.map(|b| (b, level - 1))
+						);
 					}
 				}
 
@@ -235,11 +261,11 @@ impl<'a, RS: 'a + Storage<H>, S: Storage<H>, H: Hasher> DrilldownIteratorEssence
 }
 
 /// Exploring drilldown operator.
-struct DrilldownIterator<'a, RS: 'a + Storage<H>, S: 'a + Storage<H>, H: Hasher> {
+struct DrilldownIterator<'a, RS: 'a + RootsStorage<H>, S: 'a + Storage<H>, H: Hasher> {
 	essence: DrilldownIteratorEssence<'a, RS, S, H>,
 }
 
-impl<'a, RS: 'a + Storage<H>, S: Storage<H>, H: Hasher> Iterator
+impl<'a, RS: 'a + RootsStorage<H>, S: Storage<H>, H: Hasher> Iterator
 	for DrilldownIterator<'a, RS, S, H>
 	where H::Out: HeapSizeOf
 {
@@ -252,12 +278,12 @@ impl<'a, RS: 'a + Storage<H>, S: Storage<H>, H: Hasher> Iterator
 }
 
 /// Proving drilldown iterator.
-struct ProvingDrilldownIterator<'a, RS: 'a + Storage<H>, S: 'a + Storage<H>, H: Hasher> {
+struct ProvingDrilldownIterator<'a, RS: 'a + RootsStorage<H>, S: 'a + Storage<H>, H: Hasher> {
 	essence: DrilldownIteratorEssence<'a, RS, S, H>,
 	proof_recorder: RefCell<Recorder<H::Out>>,
 }
 
-impl<'a, RS: 'a + Storage<H>, S: Storage<H>, H: Hasher> ProvingDrilldownIterator<'a, RS, S, H> {
+impl<'a, RS: 'a + RootsStorage<H>, S: Storage<H>, H: Hasher> ProvingDrilldownIterator<'a, RS, S, H> {
 	/// Consume the iterator, extracting the gathered proof in lexicographical order
 	/// by value.
 	pub fn extract_proof(self) -> Vec<Vec<u8>> {
@@ -268,7 +294,7 @@ impl<'a, RS: 'a + Storage<H>, S: Storage<H>, H: Hasher> ProvingDrilldownIterator
 	}
 }
 
-impl<'a, RS: 'a + Storage<H>, S: Storage<H>, H: Hasher> Iterator for ProvingDrilldownIterator<'a, RS, S, H> where H::Out: HeapSizeOf {
+impl<'a, RS: 'a + RootsStorage<H>, S: Storage<H>, H: Hasher> Iterator for ProvingDrilldownIterator<'a, RS, S, H> where H::Out: HeapSizeOf {
 	type Item = Result<(u64, u32), String>;
 
 	fn next(&mut self) -> Option<Self::Item> {
@@ -401,9 +427,24 @@ mod tests {
 	fn drilldown_iterator_works() {
 		let (config, storage) = prepare_for_drilldown();
 		let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
-			&config, &storage, 0, 100, 1000, &[42]);
-
+			&config, &storage, 0, 16, 16, &[42]);
 		assert_eq!(drilldown_result, Ok(vec![(8, 2), (8, 1), (6, 3), (3, 0)]));
+
+		let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
+			&config, &storage, 0, 2, 4, &[42]);
+		assert_eq!(drilldown_result, Ok(vec![]));
+
+		let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
+			&config, &storage, 0, 3, 4, &[42]);
+		assert_eq!(drilldown_result, Ok(vec![(3, 0)]));
+
+		let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
+			&config, &storage, 7, 8, 8, &[42]);
+		assert_eq!(drilldown_result, Ok(vec![(8, 2), (8, 1)]));
+
+		let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
+			&config, &storage, 5, 7, 8, &[42]);
+		assert_eq!(drilldown_result, Ok(vec![(6, 3)]));
 	}
 
 	#[test]
@@ -433,7 +474,7 @@ mod tests {
 		let (remote_config, remote_storage) = prepare_for_drilldown();
 		let remote_proof = key_changes_proof::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
 			&remote_config, &remote_storage,
-			0, 100, 1000, &[42]).unwrap();
+			0, 16, 16, &[42]).unwrap();
 
 		// happens on local light node:
 
@@ -442,7 +483,7 @@ mod tests {
 		local_storage.clear_storage();
 		let local_result = key_changes_proof_check::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>(
 			&local_config, &local_storage, remote_proof,
-			0, 100, 1000, &[42]);
+			0, 16, 16, &[42]);
 
 		// check that drilldown result is the same as if it was happening at the full node
 		assert_eq!(local_result, Ok(vec![(8, 2), (8, 1), (6, 3), (3, 0)]));
diff --git a/substrate/core/state-machine/src/changes_trie/mod.rs b/substrate/core/state-machine/src/changes_trie/mod.rs
index f7cfb506e0f6cc856c41133b9928a363c3e941be..05bdfe4ce73a2cc21465801c7f5fa6f896e5fab0 100644
--- a/substrate/core/state-machine/src/changes_trie/mod.rs
+++ b/substrate/core/state-machine/src/changes_trie/mod.rs
@@ -54,10 +54,13 @@ use trie::{DBValue, trie_root};
 pub const NO_EXTRINSIC_INDEX: u32 = 0xffffffff;
 
 /// Changes trie storage. Provides access to trie roots and trie nodes.
-pub trait Storage<H: Hasher>: Send + Sync {
+pub trait RootsStorage<H: Hasher>: Send + Sync {
 	/// Get changes trie root for given block.
 	fn root(&self, block: u64) -> Result<Option<H::Out>, String>;
+}
 
+/// Changes trie storage. Provides access to trie roots and trie nodes.
+pub trait Storage<H: Hasher>: RootsStorage<H> {
 	/// Get a trie node.
 	fn get(&self, key: &H::Out) -> Result<Option<DBValue>, String>;
 }
diff --git a/substrate/core/state-machine/src/changes_trie/storage.rs b/substrate/core/state-machine/src/changes_trie/storage.rs
index 38133a5287c4ce317c39a80a105dc1aa1db82e36..8cefb654055db0801832100bcc03e49a8b1a9d3b 100644
--- a/substrate/core/state-machine/src/changes_trie/storage.rs
+++ b/substrate/core/state-machine/src/changes_trie/storage.rs
@@ -22,7 +22,7 @@ use trie::DBValue;
 use heapsize::HeapSizeOf;
 use trie::MemoryDB;
 use parking_lot::RwLock;
-use changes_trie::Storage;
+use changes_trie::{RootsStorage, Storage};
 use trie_backend_essence::TrieBackendStorage;
 
 #[cfg(test)]
@@ -94,11 +94,13 @@ impl<H: Hasher> InMemoryStorage<H> where H::Out: HeapSizeOf {
 	}
 }
 
-impl<H: Hasher> Storage<H> for InMemoryStorage<H> where H::Out: HeapSizeOf {
+impl<H: Hasher> RootsStorage<H> for InMemoryStorage<H> where H::Out: HeapSizeOf {
 	fn root(&self, block: u64) -> Result<Option<H::Out>, String> {
 		Ok(self.data.read().roots.get(&block).cloned())
 	}
+}
 
+impl<H: Hasher> Storage<H> for InMemoryStorage<H> where H::Out: HeapSizeOf {
 	fn get(&self, key: &H::Out) -> Result<Option<DBValue>, String> {
 		MemoryDB::<H>::get(&self.data.read().mdb, key)
 	}
diff --git a/substrate/core/state-machine/src/lib.rs b/substrate/core/state-machine/src/lib.rs
index b158966826b0a8c69841273f6ef5d202a0bc2cda..f5fa6e8b3720492ac896f1da7444683c49cc1076 100644
--- a/substrate/core/state-machine/src/lib.rs
+++ b/substrate/core/state-machine/src/lib.rs
@@ -56,6 +56,7 @@ pub use testing::TestExternalities;
 pub use ext::Ext;
 pub use backend::Backend;
 pub use changes_trie::{Storage as ChangesTrieStorage,
+	RootsStorage as ChangesTrieRootsStorage,
 	InMemoryStorage as InMemoryChangesTrieStorage,
 	key_changes, key_changes_proof, key_changes_proof_check};
 pub use overlayed_changes::OverlayedChanges;
@@ -267,12 +268,15 @@ where
 	// proof-of-execution on light clients. And the proof is recorded by the backend which
 	// is created after OverlayedChanges
 
-	let changes_trie_config = try_read_overlay_value(
-		overlay,
-		backend,
-		well_known_keys::CHANGES_TRIE_CONFIG
-	)?;
-	set_changes_trie_config(overlay, changes_trie_config)?;
+	let init_overlay = |overlay: &mut OverlayedChanges, final_check: bool| {
+		let changes_trie_config = try_read_overlay_value(
+			overlay,
+			backend,
+			well_known_keys::CHANGES_TRIE_CONFIG
+		)?;
+		set_changes_trie_config(overlay, changes_trie_config, final_check)
+	};
+	init_overlay(overlay, false)?;
 
 	let result = {
 		let mut orig_prospective = overlay.prospective.clone();
@@ -334,6 +338,11 @@ where
 		result.map(move |out| (out, storage_delta, changes_delta))
 	};
 
+	// ensure that changes trie config has not been changed
+	if result.is_ok() {
+		init_overlay(overlay, true)?;
+	}
+
 	result.map_err(|e| Box::new(e) as _)
 }
 
@@ -429,18 +438,22 @@ where
 
 /// Sets overlayed changes' changes trie configuration. Returns error if configuration
 /// differs from previous OR config decode has failed.
-pub(crate) fn set_changes_trie_config(overlay: &mut OverlayedChanges, config: Option<Vec<u8>>) -> Result<(), Box<Error>> {
+pub(crate) fn set_changes_trie_config(overlay: &mut OverlayedChanges, config: Option<Vec<u8>>, final_check: bool) -> Result<(), Box<Error>> {
 	let config = match config {
 		Some(v) => Some(changes_trie::Configuration::decode(&mut &v[..])
 			.ok_or_else(|| Box::new("Failed to decode changes trie configuration".to_owned()) as Box<Error>)?),
 		None => None,
 	};
+
+	if final_check && overlay.changes_trie_config.is_some() != config.is_some() {
+		return Err(Box::new("Changes trie configuration change is not supported".to_owned()));
+	}
+
 	if let Some(config) = config {
 		if !overlay.set_changes_trie_config(config) {
 			return Err(Box::new("Changes trie configuration change is not supported".to_owned()));
 		}
 	}
-
 	Ok(())
 }
 
@@ -462,13 +475,18 @@ where
 #[cfg(test)]
 mod tests {
 	use std::collections::HashMap;
+	use codec::Encode;
 	use super::*;
 	use super::backend::InMemory;
 	use super::ext::Ext;
-	use super::changes_trie::InMemoryStorage as InMemoryChangesTrieStorage;
+	use super::changes_trie::{
+		InMemoryStorage as InMemoryChangesTrieStorage,
+		Configuration as ChangesTrieConfig,
+	};
 	use primitives::{Blake2Hasher};
 
 	struct DummyCodeExecutor {
+		change_changes_trie_config: bool,
 		native_available: bool,
 		native_succeeds: bool,
 		fallback_succeeds: bool,
@@ -486,6 +504,13 @@ mod tests {
 			_data: &[u8],
 			use_native: bool
 		) -> (Result<Vec<u8>, Self::Error>, bool) {
+			if self.change_changes_trie_config {
+				ext.place_storage(well_known_keys::CHANGES_TRIE_CONFIG.to_vec(), Some(ChangesTrieConfig {
+					digest_interval: 777,
+					digest_levels: 333,
+				}.encode()));
+			}
+
 			let using_native = use_native && self.native_available;
 			match (using_native, self.native_succeeds, self.fallback_succeeds) {
 				(true, true, _) | (false, _, true) =>
@@ -510,6 +535,7 @@ mod tests {
 			Some(&InMemoryChangesTrieStorage::new()),
 			&mut Default::default(),
 			&DummyCodeExecutor {
+				change_changes_trie_config: false,
 				native_available: true,
 				native_succeeds: true,
 				fallback_succeeds: true,
@@ -528,6 +554,7 @@ mod tests {
 			Some(&InMemoryChangesTrieStorage::new()),
 			&mut Default::default(),
 			&DummyCodeExecutor {
+				change_changes_trie_config: false,
 				native_available: true,
 				native_succeeds: true,
 				fallback_succeeds: false,
@@ -546,6 +573,7 @@ mod tests {
 	#[test]
 	fn prove_execution_and_proof_check_works() {
 		let executor = DummyCodeExecutor {
+			change_changes_trie_config: false,
 			native_available: true,
 			native_succeeds: true,
 			fallback_succeeds: true,
@@ -621,4 +649,22 @@ mod tests {
 		assert_eq!(local_result1, Some(vec![24]));
 		assert_eq!(local_result2, false);
 	}
+
+	#[test]
+	fn cannot_change_changes_trie_config() {
+		assert!(execute(
+			&trie_backend::tests::test_trie(),
+			Some(&InMemoryChangesTrieStorage::new()),
+			&mut Default::default(),
+			&DummyCodeExecutor {
+				change_changes_trie_config: true,
+				native_available: false,
+				native_succeeds: true,
+				fallback_succeeds: true,
+			},
+			"test",
+			&[],
+			ExecutionStrategy::NativeWhenPossible
+		).is_err());
+	}
 }
diff --git a/substrate/core/state-machine/src/overlayed_changes.rs b/substrate/core/state-machine/src/overlayed_changes.rs
index 7b12fba6ebf934433664aaac55dab9ab3aa94227..06b547bb9a916e84cb23f838b95494f0c184f438 100644
--- a/substrate/core/state-machine/src/overlayed_changes.rs
+++ b/substrate/core/state-machine/src/overlayed_changes.rs
@@ -52,7 +52,6 @@ impl OverlayedChanges {
 	///
 	/// Returns false if configuration has been set already and we now trying
 	/// to install different configuration. This isn't supported now.
-	#[must_use = "Result must be checked"]
 	pub(crate) fn set_changes_trie_config(&mut self, config: ChangesTrieConfig) -> bool {
 		if let Some(ref old_config) = self.changes_trie_config {
 			// we do not support changes trie configuration' change now
diff --git a/substrate/core/state-machine/src/testing.rs b/substrate/core/state-machine/src/testing.rs
index 082d68501cdc9001ac24ee16fde25f1cc45d1a0a..69f380483acb0830638ad7e6ba84fc9ea21736b0 100644
--- a/substrate/core/state-machine/src/testing.rs
+++ b/substrate/core/state-machine/src/testing.rs
@@ -39,8 +39,9 @@ impl<H: Hasher> TestExternalities<H> where H::Out: HeapSizeOf {
 		let mut overlay = OverlayedChanges::default();
 		super::set_changes_trie_config(
 			&mut overlay,
-			inner.get(&CHANGES_TRIE_CONFIG.to_vec()).cloned())
-			.expect("changes trie configuration is correct in test env; qed");
+			inner.get(&CHANGES_TRIE_CONFIG.to_vec()).cloned(),
+			false,
+		).expect("changes trie configuration is correct in test env; qed");
 
 		TestExternalities {
 			inner,
diff --git a/substrate/core/test-client/src/client_ext.rs b/substrate/core/test-client/src/client_ext.rs
index 53102605f1b2b9f12646953c8ea889e8ecf32e34..9fd97723e28abe3fa1ef4a57ef2e14e4c3a237f3 100644
--- a/substrate/core/test-client/src/client_ext.rs
+++ b/substrate/core/test-client/src/client_ext.rs
@@ -38,9 +38,9 @@ pub trait TestClient {
 }
 
 impl<B, E> TestClient for Client<B, E, runtime::Block>
-    where
-        B: client::backend::Backend<runtime::Block, Blake2Hasher>,
-        E: client::CallExecutor<runtime::Block, Blake2Hasher>
+	where
+		B: client::backend::Backend<runtime::Block, Blake2Hasher>,
+		E: client::CallExecutor<runtime::Block, Blake2Hasher>
 {
 	fn justify_and_import(&self, origin: client::BlockOrigin, block: runtime::Block) -> client::error::Result<()> {
 		let authorities: [ed25519::Pair; 3] = [
diff --git a/substrate/core/test-client/src/lib.rs b/substrate/core/test-client/src/lib.rs
index d41ea26d877a94578a479416cd20390abfd28b31..e78d317212e2962e0749a8dba3e0468960029cac 100644
--- a/substrate/core/test-client/src/lib.rs
+++ b/substrate/core/test-client/src/lib.rs
@@ -68,29 +68,37 @@ pub type Executor = client::LocalCallExecutor<
 
 /// Creates new client instance used for tests.
 pub fn new() -> client::Client<Backend, Executor, runtime::Block> {
-	new_with_backend(Arc::new(Backend::new()))
+	new_with_backend(Arc::new(Backend::new()), false)
+}
+
+/// Creates new test client instance that suports changes trie creation.
+pub fn new_with_changes_trie() -> client::Client<Backend, Executor, runtime::Block> {
+	new_with_backend(Arc::new(Backend::new()), true)
 }
 
 /// Creates new client instance used for tests with an explicitely provided backend.
 /// This is useful for testing backend implementations.
-pub fn new_with_backend<B>(backend: Arc<B>) -> client::Client<B, client::LocalCallExecutor<B, executor::NativeExecutor<LocalExecutor>>, runtime::Block>
+pub fn new_with_backend<B>(
+	backend: Arc<B>,
+	support_changes_trie: bool
+) -> client::Client<B, client::LocalCallExecutor<B, executor::NativeExecutor<LocalExecutor>>, runtime::Block>
 	where
 		B: backend::LocalBackend<runtime::Block, Blake2Hasher>,
 {
 	let executor = NativeExecutor::new();
-	client::new_with_backend(backend, executor, genesis_storage()).unwrap()
+	client::new_with_backend(backend, executor, genesis_storage(support_changes_trie)).unwrap()
 }
 
-fn genesis_config() -> GenesisConfig {
-	GenesisConfig::new_simple(vec![
+fn genesis_config(support_changes_trie: bool) -> GenesisConfig {
+	GenesisConfig::new(support_changes_trie, vec![
 		Keyring::Alice.to_raw_public().into(),
 		Keyring::Bob.to_raw_public().into(),
 		Keyring::Charlie.to_raw_public().into(),
 	], 1000)
 }
 
-fn genesis_storage() -> StorageMap {
-	let mut storage = genesis_config().genesis_map();
+fn genesis_storage(support_changes_trie: bool) -> StorageMap {
+	let mut storage = genesis_config(support_changes_trie).genesis_map();
 	let block: runtime::Block = client::genesis::construct_genesis_block(&storage);
 	storage.extend(additional_storage_with_genesis(&block));
 	storage
diff --git a/substrate/core/test-client/src/trait_tests.rs b/substrate/core/test-client/src/trait_tests.rs
index fec0a73b8e438a75b8cb59fdd5e78d98a1eaf39b..86d08cd8a51b5be4edc0dde3ca7a9ab404a52f9c 100644
--- a/substrate/core/test-client/src/trait_tests.rs
+++ b/substrate/core/test-client/src/trait_tests.rs
@@ -41,7 +41,7 @@ pub fn test_leaves_for_backend<B>(backend: Arc<B>) where
 	//			  B2 -> C3
 	//		A1 -> D2
 
-	let client = ::new_with_backend(backend.clone());
+	let client = ::new_with_backend(backend.clone(), false);
 
 	let genesis_hash = client.info().unwrap().chain.genesis_hash;
 
@@ -153,7 +153,7 @@ pub fn test_blockchain_query_by_number_gets_canonical<B>(backend: Arc<B>) where
 	//		A1 -> B2 -> B3 -> B4
 	//			  B2 -> C3
 	//		A1 -> D2
-	let client = ::new_with_backend(backend);
+	let client = ::new_with_backend(backend, false);
 
 	// G -> A1
 	let a1 = client.new_block().unwrap().bake().unwrap();
diff --git a/substrate/core/test-runtime/src/genesismap.rs b/substrate/core/test-runtime/src/genesismap.rs
index ca9d08de631530a57e85473c3dd111c265c2f7f4..607ca38a2fe9148e84427442bc19e3e362481390 100644
--- a/substrate/core/test-runtime/src/genesismap.rs
+++ b/substrate/core/test-runtime/src/genesismap.rs
@@ -18,20 +18,29 @@
 
 use std::collections::HashMap;
 use runtime_io::twox_128;
-use codec::{KeyedVec, Joiner};
-use primitives::AuthorityId;
+use codec::{Encode, KeyedVec, Joiner};
+use primitives::{AuthorityId, ChangesTrieConfiguration};
 use primitives::storage::well_known_keys;
 use runtime_primitives::traits::Block;
 
 /// Configuration of a general Substrate test genesis block.
 pub struct GenesisConfig {
+	pub changes_trie_config: Option<ChangesTrieConfiguration>,
 	pub authorities: Vec<AuthorityId>,
 	pub balances: Vec<(AuthorityId, u64)>,
 }
 
 impl GenesisConfig {
 	pub fn new_simple(authorities: Vec<AuthorityId>, balance: u64) -> Self {
+		Self::new(false, authorities, balance)
+	}
+
+	pub fn new(support_changes_trie: bool, authorities: Vec<AuthorityId>, balance: u64) -> Self {
 		GenesisConfig {
+			changes_trie_config: match support_changes_trie {
+				true => Some(super::changes_trie_config()),
+				false => None,
+			},
 			authorities: authorities.clone(),
 			balances: authorities.into_iter().map(|a| (a, balance)).collect(),
 		}
@@ -39,7 +48,7 @@ impl GenesisConfig {
 
 	pub fn genesis_map(&self) -> HashMap<Vec<u8>, Vec<u8>> {
 		let wasm_runtime = include_bytes!("../wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm").to_vec();
-		self.balances.iter()
+		let mut map: HashMap<Vec<u8>, Vec<u8>> = self.balances.iter()
 			.map(|&(account, balance)| (account.to_keyed_vec(b"balance:"), vec![].and(&balance)))
 			.map(|(k, v)| (twox_128(&k[..])[..].to_vec(), v.to_vec()))
 			.chain(vec![
@@ -51,7 +60,11 @@ impl GenesisConfig {
 				.enumerate()
 				.map(|(i, account)| ((i as u32).to_keyed_vec(well_known_keys::AUTHORITY_PREFIX), vec![].and(account)))
 			)
-			.collect()
+			.collect();
+		if let Some(ref changes_trie_config) = self.changes_trie_config {
+			map.insert(well_known_keys::CHANGES_TRIE_CONFIG.to_vec(), changes_trie_config.encode());
+		}
+		map
 	}
 }
 
diff --git a/substrate/core/test-runtime/src/lib.rs b/substrate/core/test-runtime/src/lib.rs
index 16b0b27b07b18faf1ec5d16e815d89f960c9552a..1404e6fbc8f2e9670b766916ec27a9ea8935788f 100644
--- a/substrate/core/test-runtime/src/lib.rs
+++ b/substrate/core/test-runtime/src/lib.rs
@@ -141,6 +141,14 @@ pub fn run_tests(mut input: &[u8]) -> Vec<u8> {
 	[stxs.len() as u8].encode()
 }
 
+/// Changes trie configuration (optionally) used in tests.
+pub fn changes_trie_config() -> primitives::ChangesTrieConfiguration {
+	primitives::ChangesTrieConfiguration {
+		digest_interval: 4,
+		digest_levels: 2,
+	}
+}
+
 pub mod api {
 	use system;
 	impl_stubs!(
diff --git a/substrate/core/test-runtime/src/system.rs b/substrate/core/test-runtime/src/system.rs
index 652eb4786e4de7c96125b13245f277a627506369..50ccb104d8aab54c3e35cdb66af61e11f893e6c5 100644
--- a/substrate/core/test-runtime/src/system.rs
+++ b/substrate/core/test-runtime/src/system.rs
@@ -18,7 +18,7 @@
 //! and depositing logs.
 
 use rstd::prelude::*;
-use runtime_io::{storage_root, enumerated_trie_root};
+use runtime_io::{storage_root, enumerated_trie_root, storage_changes_root};
 use runtime_support::storage::{self, StorageValue, StorageMap};
 use runtime_primitives::traits::{Hash as HashT, BlakeTwo256, Digest as DigestT};
 use runtime_primitives::generic;
@@ -32,15 +32,18 @@ const NONCE_OF: &[u8] = b"nonce:";
 const BALANCE_OF: &[u8] = b"balance:";
 
 storage_items! {
-	ExtrinsicIndex: b"sys:xti" => required u32;
 	ExtrinsicData: b"sys:xtd" => required map [ u32 => Vec<u8> ];
 	// The current block number being processed. Set by `execute_block`.
 	Number: b"sys:num" => required BlockNumber;
 	ParentHash: b"sys:pha" => required Hash;
 }
 
+pub fn balance_of_key(who: AccountId) -> Vec<u8> {
+	who.to_keyed_vec(BALANCE_OF)
+}
+
 pub fn balance_of(who: AccountId) -> u64 {
-	storage::get_or(&who.to_keyed_vec(BALANCE_OF), 0)
+	storage::get_or(&balance_of_key(who), 0)
 }
 
 pub fn nonce_of(who: AccountId) -> u64 {
@@ -62,7 +65,7 @@ pub fn initialise_block(header: Header) {
 	// populate environment.
 	<Number>::put(&header.number);
 	<ParentHash>::put(&header.parent_hash);
-	<ExtrinsicIndex>::put(0);
+	storage::unhashed::put(well_known_keys::EXTRINSIC_INDEX, &0u32);
 }
 
 /// Actually execute all transitioning for `block`.
@@ -77,26 +80,38 @@ pub fn execute_block(block: Block) {
 	assert!(txs_root == header.extrinsics_root, "Transaction trie root must be valid.");
 
 	// execute transactions
-	block.extrinsics.iter().for_each(|e| { execute_transaction_backend(e).map_err(|_| ()).expect("Extrinsic error"); });
+	block.extrinsics.iter().enumerate().for_each(|(i, e)| {
+		storage::unhashed::put(well_known_keys::EXTRINSIC_INDEX, &(i as u32));
+		execute_transaction_backend(e).map_err(|_| ()).expect("Extrinsic error");
+		storage::unhashed::kill(well_known_keys::EXTRINSIC_INDEX);
+	});
 
 	// check storage root.
 	let storage_root = storage_root().into();
 	info_expect_equal_hash(&storage_root, &header.state_root);
 	assert!(storage_root == header.state_root, "Storage root must match that calculated.");
+
+	// check digest
+	let mut digest = Digest::default();
+	if let Some(storage_changes_root) = storage_changes_root(header.number) {
+		digest.push(generic::DigestItem::ChangesTrieRoot::<Hash, u64>(storage_changes_root.into()));
+	}
+	assert!(digest == header.digest, "Header digest items must match that calculated.");
 }
 
 /// Execute a transaction outside of the block execution function.
 /// This doesn't attempt to validate anything regarding the block.
 pub fn execute_transaction(utx: Extrinsic) -> ApplyResult {
-	let extrinsic_index = ExtrinsicIndex::get();
+	let extrinsic_index: u32 = storage::unhashed::get(well_known_keys::EXTRINSIC_INDEX).unwrap();
+	let result = execute_transaction_backend(&utx);
 	ExtrinsicData::insert(extrinsic_index, utx.encode());
-	ExtrinsicIndex::put(extrinsic_index + 1);
-	execute_transaction_backend(&utx)
+	storage::unhashed::put(well_known_keys::EXTRINSIC_INDEX, &(extrinsic_index + 1));
+	result
 }
 
 /// Finalise the block.
 pub fn finalise_block() -> Header {
-	let extrinsic_index = ExtrinsicIndex::take();
+	let extrinsic_index: u32 = storage::unhashed::take(well_known_keys::EXTRINSIC_INDEX).unwrap();
 	let txs: Vec<_> = (0..extrinsic_index).map(ExtrinsicData::take).collect();
 	let txs = txs.iter().map(Vec::as_slice).collect::<Vec<_>>();
 	let extrinsics_root = enumerated_trie_root::<Blake2Hasher>(&txs).into();