From da1fb3f27385aa193924756002cedc716504336c Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky <svyatonik@gmail.com> Date: Thu, 17 Jan 2019 12:08:50 +0300 Subject: [PATCH] Use changes tries in query_storage RPC (#1082) * use changes tries in query_storage RPC * let + match + return + call -> match --- substrate/core/client/db/src/lib.rs | 19 +- substrate/core/client/src/backend.rs | 9 +- substrate/core/client/src/client.rs | 81 ++++-- substrate/core/client/src/in_mem.rs | 32 ++- substrate/core/client/src/light/backend.rs | 5 +- substrate/core/client/src/light/fetcher.rs | 12 +- substrate/core/network/src/chain.rs | 6 +- substrate/core/network/src/protocol.rs | 6 +- substrate/core/rpc/src/state/mod.rs | 249 +++++++++++++----- substrate/core/rpc/src/state/tests.rs | 139 ++++++---- .../src/changes_trie/changes_iterator.rs | 39 +-- .../state-machine/src/changes_trie/mod.rs | 2 +- .../state-machine/src/changes_trie/prune.rs | 35 +++ substrate/core/state-machine/src/lib.rs | 3 +- 14 files changed, 445 insertions(+), 192 deletions(-) diff --git a/substrate/core/client/db/src/lib.rs b/substrate/core/client/db/src/lib.rs index f663cc0834d..944540b98b0 100644 --- a/substrate/core/client/db/src/lib.rs +++ b/substrate/core/client/db/src/lib.rs @@ -430,7 +430,7 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> { } /// Prune obsolete changes tries. - pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor<Block>) { + pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block_hash: Block::Hash, block_num: NumberFor<Block>) { // never prune on archive nodes let min_blocks_to_keep = match self.min_blocks_to_keep { Some(min_blocks_to_keep) => min_blocks_to_keep, @@ -456,6 +456,23 @@ impl<Block: BlockT> DbChangesTrieStorage<Block> { } } +impl<Block: BlockT> client::backend::PrunableStateChangesTrieStorage<Blake2Hasher> for DbChangesTrieStorage<Block> { + fn oldest_changes_trie_block( + &self, + config: &ChangesTrieConfiguration, + best_finalized_block: u64 + ) -> u64 { + match self.min_blocks_to_keep { + Some(min_blocks_to_keep) => state_machine::oldest_non_pruned_changes_trie( + config, + min_blocks_to_keep, + best_finalized_block, + ), + None => 1, + } + } +} + impl<Block: BlockT> state_machine::ChangesTrieRootsStorage<Blake2Hasher> for DbChangesTrieStorage<Block> { fn root(&self, anchor: &state_machine::ChangesTrieAnchorBlockId<H256>, block: u64) -> Result<Option<H256>, String> { // check API requirement diff --git a/substrate/core/client/src/backend.rs b/substrate/core/client/src/backend.rs index 6e65f289ad8..b4040f59a9f 100644 --- a/substrate/core/client/src/backend.rs +++ b/substrate/core/client/src/backend.rs @@ -17,6 +17,7 @@ //! Substrate Client data backend use crate::error; +use primitives::ChangesTrieConfiguration; use runtime_primitives::{generic::BlockId, Justification, StorageMap, ChildrenStorageMap}; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, NumberFor}; use state_machine::backend::Backend as StateBackend; @@ -113,7 +114,7 @@ pub trait Backend<Block, H>: AuxStore + Send + Sync where /// Associated state backend type. type State: StateBackend<H>; /// Changes trie storage. - type ChangesTrieStorage: StateChangesTrieStorage<H>; + type ChangesTrieStorage: PrunableStateChangesTrieStorage<H>; /// Begin a new block insertion transaction with given parent block id. /// When constructing the genesis, this is called with all-zero hash. @@ -154,6 +155,12 @@ pub trait Backend<Block, H>: AuxStore + Send + Sync where } } +/// Changes trie storage that supports pruning. +pub trait PrunableStateChangesTrieStorage<H: Hasher>: StateChangesTrieStorage<H> { + /// Get number block of oldest, non-pruned changes trie. + fn oldest_changes_trie_block(&self, config: &ChangesTrieConfiguration, best_finalized: u64) -> u64; +} + /// Mark for all Backend implementations, that are making use of state data, stored locally. pub trait LocalBackend<Block, H>: Backend<Block, H> where diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index f365c61c1fa..010912e75f9 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -42,7 +42,7 @@ use state_machine::{ key_changes, key_changes_proof, OverlayedChanges }; -use crate::backend::{self, BlockImportOperation}; +use crate::backend::{self, BlockImportOperation, PrunableStateChangesTrieStorage}; use crate::blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend}; use crate::call_executor::{CallExecutor, LocalCallExecutor}; use executor::{RuntimeVersion, RuntimeInfo}; @@ -355,35 +355,54 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where Ok((header, proof)) } + /// Get longest range within [first; last] that is possible to use in `key_changes` + /// and `key_changes_proof` calls. + /// Range could be shortened from the beginning if some changes tries have been pruned. + /// Returns Ok(None) if changes trues are not supported. + pub fn max_key_changes_range( + &self, + first: NumberFor<Block>, + last: BlockId<Block>, + ) -> error::Result<Option<(NumberFor<Block>, BlockId<Block>)>> { + let (config, storage) = match self.require_changes_trie().ok() { + Some((config, storage)) => (config, storage), + None => return Ok(None), + }; + let first = first.as_(); + let last_num = self.backend.blockchain().expect_block_number_from_id(&last)?.as_(); + if first > last_num { + return Err(error::ErrorKind::ChangesTrieAccessFailed("Invalid changes trie range".into()).into()); + } + let finalized_number = self.backend.blockchain().info()?.finalized_number; + let oldest = storage.oldest_changes_trie_block(&config, finalized_number.as_()); + let first = As::sa(::std::cmp::max(first, oldest)); + Ok(Some((first, last))) + } + /// Get pairs of (block, extrinsic) where key has been changed at given blocks range. /// Works only for runtimes that are supporting changes tries. pub fn key_changes( &self, - first: Block::Hash, - last: Block::Hash, - key: &[u8] + first: NumberFor<Block>, + last: BlockId<Block>, + key: &StorageKey ) -> error::Result<Vec<(NumberFor<Block>, u32)>> { - let config = self.changes_trie_config()?; - let storage = self.backend.changes_trie_storage(); - let (config, storage) = match (config, storage) { - (Some(config), Some(storage)) => (config, storage), - _ => return Err(error::ErrorKind::ChangesTriesNotSupported.into()), - }; + let (config, storage) = self.require_changes_trie()?; + let last_number = self.backend.blockchain().expect_block_number_from_id(&last)?.as_(); + let last_hash = self.backend.blockchain().expect_block_hash_from_id(&last)?; - let first_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(first))?.as_(); - let last_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(last))?.as_(); key_changes::<_, Blake2Hasher>( &config, - storage, - first_number, + &*storage, + first.as_(), &ChangesTrieAnchorBlockId { - hash: convert_hash(&last), + hash: convert_hash(&last_hash), number: last_number, }, self.backend.blockchain().info()?.best_number.as_(), - key) + &key.0) + .and_then(|r| r.map(|r| r.map(|(block, tx)| (As::sa(block), tx))).collect::<Result<_, _>>()) .map_err(|err| error::ErrorKind::ChangesTrieAccessFailed(err).into()) - .map(|r| r.into_iter().map(|(b, e)| (As::sa(b), e)).collect()) } /// Get proof for computation of (block, extrinsic) pairs where key has been changed at given blocks range. @@ -398,7 +417,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where last: Block::Hash, min: Block::Hash, max: Block::Hash, - key: &[u8] + key: &StorageKey ) -> error::Result<ChangesProof<Block::Header>> { self.key_changes_proof_with_cht_size( first, @@ -417,7 +436,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where last: Block::Hash, min: Block::Hash, max: Block::Hash, - key: &[u8], + key: &StorageKey, cht_size: u64, ) -> error::Result<ChangesProof<Block::Header>> { struct AccessedRootsRecorder<'a, Block: BlockT> { @@ -447,14 +466,9 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where } } - let config = self.changes_trie_config()?; - let storage = self.backend.changes_trie_storage(); - let (config, storage) = match (config, storage) { - (Some(config), Some(storage)) => (config, storage), - _ => return Err(error::ErrorKind::ChangesTriesNotSupported.into()), - }; - + let (config, storage) = self.require_changes_trie()?; let min_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(min))?; + let recording_storage = AccessedRootsRecorder::<Block> { storage, min: min_number.as_(), @@ -478,7 +492,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where number: last_number, }, max_number.as_(), - key + &key.0 ) .map_err(|err| error::Error::from(error::ErrorKind::ChangesTrieAccessFailed(err)))?; @@ -528,6 +542,16 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where Ok(proof) } + /// Returns changes trie configuration and storage or an error if it is not supported. + fn require_changes_trie(&self) -> error::Result<(ChangesTrieConfiguration, &B::ChangesTrieStorage)> { + let config = self.changes_trie_config()?; + let storage = self.backend.changes_trie_storage(); + match (config, storage) { + (Some(config), Some(storage)) => Ok((config, storage)), + _ => Err(error::ErrorKind::ChangesTriesNotSupported.into()), + } + } + /// Create a new block, built on the head of the chain. pub fn new_block<InherentData>( &self @@ -1713,9 +1737,8 @@ pub(crate) mod tests { let (client, _, test_cases) = prepare_client_with_key_changes(); for (index, (begin, end, key, expected_result)) in test_cases.into_iter().enumerate() { - let begin = client.block_hash(begin).unwrap().unwrap(); let end = client.block_hash(end).unwrap().unwrap(); - let actual_result = client.key_changes(begin, end, &key).unwrap(); + let actual_result = client.key_changes(begin, BlockId::Hash(end), &StorageKey(key)).unwrap(); match actual_result == expected_result { true => (), false => panic!(format!("Failed test {}: actual = {:?}, expected = {:?}", diff --git a/substrate/core/client/src/in_mem.rs b/substrate/core/client/src/in_mem.rs index 114849ca6c1..d5065d76413 100644 --- a/substrate/core/client/src/in_mem.rs +++ b/substrate/core/client/src/in_mem.rs @@ -22,14 +22,14 @@ use parking_lot::RwLock; use crate::error; use crate::backend::{self, NewBlockState}; use crate::light; -use primitives::storage::well_known_keys; +use primitives::{ChangesTrieConfiguration, storage::well_known_keys}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Zero, NumberFor, As, Digest, DigestItem, AuthorityIdFor}; use runtime_primitives::{Justification, StorageMap, ChildrenStorageMap}; use crate::blockchain::{self, BlockStatus, HeaderBackend}; use state_machine::backend::{Backend as StateBackend, InMemory, Consolidate}; -use state_machine::InMemoryChangesTrieStorage; +use state_machine::{self, InMemoryChangesTrieStorage, ChangesTrieAnchorBlockId}; use hash_db::Hasher; use heapsize::HeapSizeOf; use crate::leaves::LeafSet; @@ -505,7 +505,7 @@ where H::Out: HeapSizeOf + Ord, { states: RwLock<HashMap<Block::Hash, InMemory<H>>>, - changes_trie_storage: InMemoryChangesTrieStorage<H>, + changes_trie_storage: ChangesTrieStorage<H>, blockchain: Blockchain<Block>, } @@ -519,7 +519,7 @@ where pub fn new() -> Backend<Block, H> { Backend { states: RwLock::new(HashMap::new()), - changes_trie_storage: InMemoryChangesTrieStorage::new(), + changes_trie_storage: ChangesTrieStorage(InMemoryChangesTrieStorage::new()), blockchain: Blockchain::new(), } } @@ -555,7 +555,7 @@ where type BlockImportOperation = BlockImportOperation<Block, H>; type Blockchain = Blockchain<Block>; type State = InMemory<H>; - type ChangesTrieStorage = InMemoryChangesTrieStorage<H>; + type ChangesTrieStorage = ChangesTrieStorage<H>; fn begin_operation(&self, block: BlockId<Block>) -> error::Result<Self::BlockImportOperation> { let state = match block { @@ -587,7 +587,7 @@ where if let Some(changes_trie_root) = changes_trie_root { if let Some(changes_trie_update) = operation.changes_trie_update { let changes_trie_root: H::Out = changes_trie_root.into(); - self.changes_trie_storage.insert(header.number().as_(), changes_trie_root, changes_trie_update); + self.changes_trie_storage.0.insert(header.number().as_(), changes_trie_root, changes_trie_update); } } @@ -652,6 +652,26 @@ impl<Block: BlockT> blockchain::Cache<Block> for Cache<Block> { } } +/// Prunable in-memory changes trie storage. +pub struct ChangesTrieStorage<H: Hasher>(InMemoryChangesTrieStorage<H>) where H::Out: HeapSizeOf; +impl<H: Hasher> backend::PrunableStateChangesTrieStorage<H> for ChangesTrieStorage<H> where H::Out: HeapSizeOf { + fn oldest_changes_trie_block(&self, _config: &ChangesTrieConfiguration, _best_finalized: u64) -> u64 { + 0 + } +} + +impl<H: Hasher> state_machine::ChangesTrieRootsStorage<H> for ChangesTrieStorage<H> where H::Out: HeapSizeOf { + fn root(&self, anchor: &ChangesTrieAnchorBlockId<H::Out>, block: u64) -> Result<Option<H::Out>, String> { + self.0.root(anchor, block) + } +} + +impl<H: Hasher> state_machine::ChangesTrieStorage<H> for ChangesTrieStorage<H> where H::Out: HeapSizeOf { + fn get(&self, key: &H::Out) -> Result<Option<state_machine::DBValue>, String> { + self.0.get(key) + } +} + /// Insert authorities entry into in-memory blockchain cache. Extracted as a separate function to use it in tests. pub fn cache_authorities_at<Block: BlockT>( blockchain: &Blockchain<Block>, diff --git a/substrate/core/client/src/light/backend.rs b/substrate/core/client/src/light/backend.rs index 7485b8cdf35..c144ffa5203 100644 --- a/substrate/core/client/src/light/backend.rs +++ b/substrate/core/client/src/light/backend.rs @@ -22,9 +22,8 @@ use futures::{Future, IntoFuture}; use parking_lot::RwLock; use runtime_primitives::{generic::BlockId, Justification, StorageMap, ChildrenStorageMap}; -use state_machine::{Backend as StateBackend, InMemoryChangesTrieStorage, TrieBackend}; +use state_machine::{Backend as StateBackend, TrieBackend}; use runtime_primitives::traits::{Block as BlockT, NumberFor, AuthorityIdFor}; - use crate::in_mem; use crate::backend::{AuxStore, Backend as ClientBackend, BlockImportOperation, RemoteBackend, NewBlockState}; use crate::blockchain::HeaderBackend as BlockchainHeaderBackend; @@ -95,7 +94,7 @@ impl<S, F, Block, H> ClientBackend<Block, H> for Backend<S, F> where type BlockImportOperation = ImportOperation<Block, S, F>; type Blockchain = Blockchain<S, F>; type State = OnDemandState<Block, S, F>; - type ChangesTrieStorage = InMemoryChangesTrieStorage<H>; + type ChangesTrieStorage = in_mem::ChangesTrieStorage<H>; fn begin_operation(&self, _block: BlockId<Block>) -> ClientResult<Self::BlockImportOperation> { Ok(ImportOperation { diff --git a/substrate/core/client/src/light/fetcher.rs b/substrate/core/client/src/light/fetcher.rs index 3e98a063ac3..49c7a5a5f20 100644 --- a/substrate/core/client/src/light/fetcher.rs +++ b/substrate/core/client/src/light/fetcher.rs @@ -403,7 +403,7 @@ pub mod tests { RemoteCallRequest, RemoteHeaderRequest}; use crate::light::blockchain::tests::{DummyStorage, DummyBlockchain}; use primitives::{twox_128, Blake2Hasher}; - use primitives::storage::well_known_keys; + use primitives::storage::{StorageKey, well_known_keys}; use runtime_primitives::generic::BlockId; use state_machine::Backend; use super::*; @@ -546,6 +546,7 @@ pub mod tests { let end_hash = remote_client.block_hash(end).unwrap().unwrap(); // 'fetch' changes proof from remote node + let key = StorageKey(key); let remote_proof = remote_client.key_changes_proof( begin_hash, end_hash, begin_hash, max_hash, &key ).unwrap(); @@ -558,7 +559,7 @@ pub mod tests { last_block: (end, end_hash), max_block: (max, max_hash), tries_roots: (begin, begin_hash, local_roots_range), - key: key, + key: key.0, retry_count: None, }; let local_result = local_checker.check_changes_proof(&request, ChangesProof { @@ -583,6 +584,7 @@ pub mod tests { // (1, 4, dave.clone(), vec![(4, 0), (1, 1), (1, 0)]), let (remote_client, remote_roots, _) = prepare_client_with_key_changes(); let dave = twox_128(&runtime::system::balance_of_key(Keyring::Dave.to_raw_public().into())).to_vec(); + let dave = StorageKey(dave); // 'fetch' changes proof from remote node: // we're fetching changes for range b1..b4 @@ -611,7 +613,7 @@ pub mod tests { last_block: (4, b4), max_block: (4, b4), tries_roots: (3, b3, vec![remote_roots[2].clone(), remote_roots[3].clone()]), - key: dave, + key: dave.0, retry_count: None, }; let local_result = local_checker.check_changes_proof_with_cht_size(&request, ChangesProof { @@ -640,6 +642,7 @@ pub mod tests { let end_hash = remote_client.block_hash(end).unwrap().unwrap(); // 'fetch' changes proof from remote node + let key = StorageKey(key); let remote_proof = remote_client.key_changes_proof( begin_hash, end_hash, begin_hash, max_hash, &key).unwrap(); @@ -650,7 +653,7 @@ pub mod tests { last_block: (end, end_hash), max_block: (max, max_hash), tries_roots: (begin, begin_hash, local_roots_range.clone()), - key: key, + key: key.0, retry_count: None, }; @@ -693,6 +696,7 @@ pub mod tests { let local_cht_root = cht::compute_root::<Header, Blake2Hasher, _>( 4, 0, remote_roots.iter().cloned().map(|ct| Ok(Some(ct)))).unwrap(); let dave = twox_128(&runtime::system::balance_of_key(Keyring::Dave.to_raw_public().into())).to_vec(); + let dave = StorageKey(dave); // 'fetch' changes proof from remote node: // we're fetching changes for range b1..b4 diff --git a/substrate/core/network/src/chain.rs b/substrate/core/network/src/chain.rs index 457f8852b2c..40edbfbadb9 100644 --- a/substrate/core/network/src/chain.rs +++ b/substrate/core/network/src/chain.rs @@ -24,7 +24,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, AuthorityId use runtime_primitives::generic::{BlockId}; use consensus::{ImportBlock, ImportResult}; use runtime_primitives::Justification; -use primitives::{H256, Blake2Hasher}; +use primitives::{H256, Blake2Hasher, storage::StorageKey}; /// Local client abstraction for the network. pub trait Client<Block: BlockT>: Send + Sync { @@ -66,7 +66,7 @@ pub trait Client<Block: BlockT>: Send + Sync { last: Block::Hash, min: Block::Hash, max: Block::Hash, - key: &[u8] + key: &StorageKey ) -> Result<ChangesProof<Block::Header>, Error>; } @@ -125,7 +125,7 @@ impl<B, E, Block, RA> Client<Block> for SubstrateClient<B, E, Block, RA> where last: Block::Hash, min: Block::Hash, max: Block::Hash, - key: &[u8] + key: &StorageKey ) -> Result<ChangesProof<Block::Header>, Error> { (self as &SubstrateClient<B, E, Block, RA>).key_changes_proof(first, last, min, max, key) } diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 41612c788d7..31891450e1e 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -22,6 +22,7 @@ use parking_lot::RwLock; use rustc_hex::ToHex; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, As, Zero}; use runtime_primitives::generic::BlockId; +use primitives::storage::StorageKey; use network_libp2p::{NodeIndex, Severity}; use codec::{Encode, Decode}; use consensus::import_queue::ImportQueue; @@ -712,11 +713,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { fn on_remote_changes_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteChangesRequest<B::Hash>) { trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})", request.id, who, request.key.to_hex(), request.first, request.last); - let proof = match self.context_data.chain.key_changes_proof(request.first, request.last, request.min, request.max, &request.key) { + let key = StorageKey(request.key); + let proof = match self.context_data.chain.key_changes_proof(request.first, request.last, request.min, request.max, &key) { Ok(proof) => proof, Err(error) => { trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{}) failed with: {}", - request.id, who, request.key.to_hex(), request.first, request.last, error); + request.id, who, key.0.to_hex(), request.first, request.last, error); ChangesProof::<B::Header> { max_block: Zero::zero(), proof: vec![], diff --git a/substrate/core/rpc/src/state/mod.rs b/substrate/core/rpc/src/state/mod.rs index 72175ca5df9..49dc28137a8 100644 --- a/substrate/core/rpc/src/state/mod.rs +++ b/substrate/core/rpc/src/state/mod.rs @@ -17,7 +17,8 @@ //! Substrate state API. use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, + ops::Range, sync::Arc, }; @@ -31,7 +32,7 @@ use primitives::storage::{self, StorageKey, StorageData, StorageChangeSet}; use rpc::Result as RpcResult; use rpc::futures::{stream, Future, Sink, Stream}; use runtime_primitives::generic::BlockId; -use runtime_primitives::traits::{Block as BlockT, Header, ProvideRuntimeApi}; +use runtime_primitives::traits::{Block as BlockT, Header, ProvideRuntimeApi, As, NumberFor}; use runtime_version::RuntimeVersion; use subscriptions::Subscriptions; @@ -112,7 +113,25 @@ pub struct State<B, E, Block: BlockT, RA> { subscriptions: Subscriptions, } -impl<B, E, Block: BlockT, RA> State<B, E, Block, RA> { +/// Ranges to query in state_queryStorage. +struct QueryStorageRange<Block: BlockT> { + /// Hashes of all the blocks in the range. + pub hashes: Vec<Block::Hash>, + /// Number of the first block in the range. + pub first_number: NumberFor<Block>, + /// Blocks subrange ([begin; end) indices within `hashes`) where we should read keys at + /// each state to get changes. + pub unfiltered_range: Range<usize>, + /// Blocks subrange ([begin; end) indices within `hashes`) where we could pre-filter + /// blocks-with-changes by using changes tries. + pub filtered_range: Option<Range<usize>>, +} + +impl<B, E, Block: BlockT, RA> State<B, E, Block, RA> where + Block: BlockT<Hash=H256>, + B: client::backend::Backend<Block, Blake2Hasher>, + E: CallExecutor<Block, Blake2Hasher>, +{ /// Create new State API RPC handler. pub fn new(client: Arc<Client<B, E, Block, RA>>, subscriptions: Subscriptions) -> Self { Self { @@ -120,6 +139,128 @@ impl<B, E, Block: BlockT, RA> State<B, E, Block, RA> { subscriptions, } } + + /// Splits the `query_storage` block range into 'filtered' and 'unfiltered' subranges. + /// Blocks that contain changes within filtered subrange could be filtered using changes tries. + /// Blocks that contain changes within unfiltered subrange must be filtered manually. + fn split_query_storage_range( + &self, + from: Block::Hash, + to: Trailing<Block::Hash> + ) -> Result<QueryStorageRange<Block>> { + let to = self.unwrap_or_best(to)?; + let from_hdr = self.client.header(&BlockId::hash(from))?; + let to_hdr = self.client.header(&BlockId::hash(to))?; + match (from_hdr, to_hdr) { + (Some(ref from), Some(ref to)) if from.number() <= to.number() => { + // check if we can get from `to` to `from` by going through parent_hashes. + let from_number = *from.number(); + let blocks = { + let mut blocks = vec![to.hash()]; + let mut last = to.clone(); + while *last.number() > from_number { + if let Some(hdr) = self.client.header(&BlockId::hash(*last.parent_hash()))? { + blocks.push(hdr.hash()); + last = hdr; + } else { + bail!(invalid_block_range( + Some(from), + Some(to), + format!("Parent of {} ({}) not found", last.number(), last.hash()), + )) + } + } + if last.hash() != from.hash() { + bail!(invalid_block_range( + Some(from), + Some(to), + format!("Expected to reach `from`, got {} ({})", last.number(), last.hash()), + )) + } + blocks.reverse(); + blocks + }; + // check if we can filter blocks-with-changes from some (sub)range using changes tries + let changes_trie_range = self.client.max_key_changes_range(from_number, BlockId::Hash(to.hash()))?; + let filtered_range_begin = changes_trie_range.map(|(begin, _)| (begin - from_number).as_() as usize); + let (unfiltered_range, filtered_range) = split_range(blocks.len(), filtered_range_begin); + Ok(QueryStorageRange { + hashes: blocks, + first_number: from_number, + unfiltered_range, + filtered_range, + }) + }, + (from, to) => bail!( + invalid_block_range(from.as_ref(), to.as_ref(), "Invalid range or unknown block".into()) + ), + } + } + + /// Iterates through range.unfiltered_range and check each block for changes of keys' values. + fn query_storage_unfiltered( + &self, + range: &QueryStorageRange<Block>, + keys: &[StorageKey], + changes: &mut Vec<StorageChangeSet<Block::Hash>>, + ) -> Result<()> { + let mut last_state: HashMap<_, Option<_>> = Default::default(); + for block in range.unfiltered_range.start..range.unfiltered_range.end { + let block_hash = range.hashes[block].clone(); + let mut block_changes = StorageChangeSet { block: block_hash.clone(), changes: Vec::new() }; + let id = BlockId::hash(block_hash); + for key in keys { + let (has_changed, data) = { + let curr_data = self.client.storage(&id, key)?; + let prev_data = last_state.get(key).and_then(|x| x.as_ref()); + (curr_data.as_ref() != prev_data, curr_data) + }; + if has_changed { + block_changes.changes.push((key.clone(), data.clone())); + } + last_state.insert(key.clone(), data); + } + changes.push(block_changes); + } + Ok(()) + } + + /// Iterates through all blocks that are changing keys within range.filtered_range and collects these changes. + fn query_storage_filtered( + &self, + range: &QueryStorageRange<Block>, + keys: &[StorageKey], + changes: &mut Vec<StorageChangeSet<Block::Hash>>, + ) -> Result<()> { + let (begin, end) = match range.filtered_range { + Some(ref filtered_range) => ( + range.first_number + As::sa(filtered_range.start as u64), + BlockId::Hash(range.hashes[filtered_range.end - 1].clone()) + ), + None => return Ok(()), + }; + let mut changes_map: BTreeMap<NumberFor<Block>, StorageChangeSet<Block::Hash>> = BTreeMap::new(); + for key in keys { + let mut last_block = None; + for (block, _) in self.client.key_changes(begin, end, key)? { + if last_block == Some(block) { + continue; + } + let block_hash = range.hashes[(block - range.first_number).as_() as usize].clone(); + let id = BlockId::Hash(block_hash); + let value_at_block = self.client.storage(&id, key)?; + changes_map.entry(block) + .or_insert_with(|| StorageChangeSet { block: block_hash, changes: Vec::new() }) + .changes.push((key.clone(), value_at_block)); + last_block = Some(block); + } + } + if let Some(additional_capacity) = changes_map.len().checked_sub(changes.len()) { + changes.reserve(additional_capacity); + } + changes.extend(changes_map.into_iter().map(|(_, cs)| cs)); + Ok(()) + } } impl<B, E, Block, RA> State<B, E, Block, RA> where @@ -178,72 +319,17 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where self.client.runtime_api().metadata(&BlockId::Hash(block)).map(Into::into).map_err(Into::into) } - fn query_storage(&self, keys: Vec<StorageKey>, from: Block::Hash, to: Trailing<Block::Hash>) -> Result<Vec<StorageChangeSet<Block::Hash>>> { - let to = self.unwrap_or_best(to)?; - - let from_hdr = self.client.header(&BlockId::hash(from))?; - let to_hdr = self.client.header(&BlockId::hash(to))?; - - match (from_hdr, to_hdr) { - (Some(ref from), Some(ref to)) if from.number() <= to.number() => { - let from = from.clone(); - let to = to.clone(); - // check if we can get from `to` to `from` by going through parent_hashes. - let blocks = { - let mut blocks = vec![to.hash()]; - let mut last = to.clone(); - while last.number() > from.number() { - if let Some(hdr) = self.client.header(&BlockId::hash(*last.parent_hash()))? { - blocks.push(hdr.hash()); - last = hdr; - } else { - bail!(invalid_block_range( - Some(from), - Some(to), - format!("Parent of {} ({}) not found", last.number(), last.hash()), - )) - } - } - if last.hash() != from.hash() { - bail!(invalid_block_range( - Some(from), - Some(to), - format!("Expected to reach `from`, got {} ({})", last.number(), last.hash()), - )) - } - blocks.reverse(); - blocks - }; - let mut result = Vec::new(); - let mut last_state: HashMap<_, Option<_>> = Default::default(); - for block in blocks { - let mut changes = vec![]; - let id = BlockId::hash(block.clone()); - - for key in &keys { - let (has_changed, data) = { - let curr_data = self.client.storage(&id, key)?; - let prev_data = last_state.get(key).and_then(|x| x.as_ref()); - - (curr_data.as_ref() != prev_data, curr_data) - }; - - if has_changed { - changes.push((key.clone(), data.clone())); - } - - last_state.insert(key.clone(), data); - } - - result.push(StorageChangeSet { - block, - changes, - }); - } - Ok(result) - }, - (from, to) => bail!(invalid_block_range(from, to, "Invalid range or unknown block".into())), - } + fn query_storage( + &self, + keys: Vec<StorageKey>, + from: Block::Hash, + to: Trailing<Block::Hash> + ) -> Result<Vec<StorageChangeSet<Block::Hash>>> { + let range = self.split_query_storage_range(from, to)?; + let mut changes = Vec::new(); + self.query_storage_unfiltered(&range, &keys, &mut changes)?; + self.query_storage_filtered(&range, &keys, &mut changes)?; + Ok(changes) } fn subscribe_storage( @@ -348,8 +434,29 @@ impl<B, E, Block, RA> StateApi<Block::Hash> for State<B, E, Block, RA> where } } -fn invalid_block_range<H: Header>(from: Option<H>, to: Option<H>, reason: String) -> error::ErrorKind { - let to_string = |x: Option<H>| match x { +/// Splits passed range into two subranges where: +/// - first range has at least one element in it; +/// - second range (optionally) starts at given `middle` element. +pub(crate) fn split_range(size: usize, middle: Option<usize>) -> (Range<usize>, Option<Range<usize>>) { + // check if we can filter blocks-with-changes from some (sub)range using changes tries + let range2_begin = match middle { + // some of required changes tries are pruned => use available tries + Some(middle) if middle != 0 => Some(middle), + // all required changes tries are available, but we still want values at first block + // => do 'unfiltered' read for the first block and 'filtered' for the rest + Some(_) if size > 1 => Some(1), + // range contains single element => do not use changes tries + Some(_) => None, + // changes tries are not available => do 'unfiltered' read for the whole range + None => None, + }; + let range1 = 0..range2_begin.unwrap_or(size); + let range2 = range2_begin.map(|begin| begin..size); + (range1, range2) +} + +fn invalid_block_range<H: Header>(from: Option<&H>, to: Option<&H>, reason: String) -> error::ErrorKind { + let to_string = |x: Option<&H>| match x { None => "unknown hash".into(), Some(h) => format!("{} ({})", h.number(), h.hash()), }; diff --git a/substrate/core/rpc/src/state/tests.rs b/substrate/core/rpc/src/state/tests.rs index 3fe306d0bce..8cea4d8a6c1 100644 --- a/substrate/core/rpc/src/state/tests.rs +++ b/substrate/core/rpc/src/state/tests.rs @@ -117,66 +117,95 @@ fn should_send_initial_storage_changes_and_notifications() { #[test] fn should_query_storage() { - let core = ::tokio::runtime::Runtime::new().unwrap(); - let client = Arc::new(test_client::new()); - let api = State::new(client.clone(), Subscriptions::new(core.executor())); - - let add_block = |nonce| { - let mut builder = client.new_block().unwrap(); - builder.push_transfer(runtime::Transfer { - from: Keyring::Alice.to_raw_public().into(), - to: Keyring::Ferdie.to_raw_public().into(), - amount: 42, - nonce, - }).unwrap(); - let block = builder.bake().unwrap(); - let hash = block.header.hash(); - client.import(BlockOrigin::Own, block).unwrap(); - hash - }; - let block1_hash = add_block(0); - let block2_hash = add_block(1); - let genesis_hash = client.genesis_hash(); - + type TestClient = test_client::client::Client< + test_client::Backend, + test_client::Executor, + runtime::Block, + runtime::RuntimeApi + >; + + fn run_tests(client: Arc<TestClient>) { + let core = ::tokio::runtime::Runtime::new().unwrap(); + let api = State::new(client.clone(), Subscriptions::new(core.executor())); - let mut expected = vec![ - StorageChangeSet { - block: genesis_hash, + let add_block = |nonce| { + let mut builder = client.new_block().unwrap(); + builder.push_transfer(runtime::Transfer { + from: Keyring::Alice.to_raw_public().into(), + to: Keyring::Ferdie.to_raw_public().into(), + amount: 42, + nonce, + }).unwrap(); + let block = builder.bake().unwrap(); + let hash = block.header.hash(); + client.import(BlockOrigin::Own, block).unwrap(); + hash + }; + let block1_hash = add_block(0); + let block2_hash = add_block(1); + let genesis_hash = client.genesis_hash(); + + + let mut expected = vec![ + StorageChangeSet { + block: genesis_hash, + changes: vec![ + ( + StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), + Some(StorageData(vec![232, 3, 0, 0, 0, 0, 0, 0])) + ), + ], + }, + StorageChangeSet { + block: block1_hash, + changes: vec![ + ( + StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), + Some(StorageData(vec![190, 3, 0, 0, 0, 0, 0, 0])) + ), + ], + }, + ]; + + // Query changes only up to block1 + let result = api.query_storage( + vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())], + genesis_hash, + Some(block1_hash).into(), + ); + + assert_eq!(result.unwrap(), expected); + + // Query all changes + let result = api.query_storage( + vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())], + genesis_hash, + None.into(), + ); + + expected.push(StorageChangeSet { + block: block2_hash, changes: vec![ - (StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), Some(StorageData(vec![232, 3, 0, 0, 0, 0, 0, 0]))), + ( + StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), + Some(StorageData(vec![148, 3, 0, 0, 0, 0, 0, 0])) + ), ], - }, - StorageChangeSet { - block: block1_hash, - changes: vec![ - (StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), Some(StorageData(vec![190, 3, 0, 0, 0, 0, 0, 0]))), - ], - }, - ]; - - // Query changes only up to block1 - let result = api.query_storage( - vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())], - genesis_hash, - Some(block1_hash).into(), - ); - - assert_eq!(result.unwrap(), expected); + }); + assert_eq!(result.unwrap(), expected); + } - // Query all changes - let result = api.query_storage( - vec![StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap())], - genesis_hash, - None.into(), - ); + run_tests(Arc::new(test_client::new())); + run_tests(Arc::new(test_client::new_with_changes_trie())); +} - expected.push(StorageChangeSet { - block: block2_hash, - changes: vec![ - (StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), Some(StorageData(vec![148, 3, 0, 0, 0, 0, 0, 0]))), - ], - }); - assert_eq!(result.unwrap(), expected); +#[test] +fn should_split_ranges() { + assert_eq!(split_range(1, None), (0..1, None)); + assert_eq!(split_range(100, None), (0..100, None)); + assert_eq!(split_range(1, Some(0)), (0..1, None)); + assert_eq!(split_range(100, Some(50)), (0..50, Some(50..100))); + assert_eq!(split_range(100, Some(99)), (0..99, Some(99..100))); } diff --git a/substrate/core/state-machine/src/changes_trie/changes_iterator.rs b/substrate/core/state-machine/src/changes_trie/changes_iterator.rs index e8bd1e7db6b..af844db2c86 100644 --- a/substrate/core/state-machine/src/changes_trie/changes_iterator.rs +++ b/substrate/core/state-machine/src/changes_trie/changes_iterator.rs @@ -31,15 +31,16 @@ use trie_backend_essence::{TrieBackendEssence}; /// Return changes of given key at given blocks range. /// `max` is the number of best known block. -pub fn key_changes<S: Storage<H>, H: Hasher>( - config: &Configuration, - storage: &S, +/// Changes are returned in descending order (i.e. last block comes first). +pub fn key_changes<'a, S: Storage<H>, H: Hasher>( + config: &'a Configuration, + storage: &'a S, begin: u64, - end: &AnchorBlockId<H::Out>, + end: &'a AnchorBlockId<H::Out>, max: u64, - key: &[u8], -) -> Result<Vec<(u64, u32)>, String> where H::Out: HeapSizeOf { - DrilldownIterator { + key: &'a [u8], +) -> Result<DrilldownIterator<'a, S, S, H>, String> where H::Out: HeapSizeOf { + Ok(DrilldownIterator { essence: DrilldownIteratorEssence { key, roots_storage: storage, @@ -53,7 +54,7 @@ pub fn key_changes<S: Storage<H>, H: Hasher>( _hasher: ::std::marker::PhantomData::<H>::default(), }, - }.collect() + }) } /// Returns proof of changes of given key at given blocks range. @@ -93,6 +94,7 @@ pub fn key_changes_proof<S: Storage<H>, H: Hasher>( /// Check key changes proog and return changes of the key at given blocks range. /// `max` is the number of best known block. +/// Changes are returned in descending order (i.e. last block comes first). pub fn key_changes_proof_check<S: RootsStorage<H>, H: Hasher>( config: &Configuration, roots_storage: &S, @@ -261,7 +263,7 @@ impl<'a, RS: 'a + RootsStorage<H>, S: Storage<H>, H: Hasher> DrilldownIteratorEs } /// Exploring drilldown operator. -struct DrilldownIterator<'a, RS: 'a + RootsStorage<H>, S: 'a + Storage<H>, H: Hasher> where H::Out: 'a { +pub struct DrilldownIterator<'a, RS: 'a + RootsStorage<H>, S: 'a + Storage<H>, H: Hasher> where H::Out: 'a { essence: DrilldownIteratorEssence<'a, RS, S, H>, } @@ -379,6 +381,7 @@ fn lower_bound_max_digest( #[cfg(test)] mod tests { + use std::iter::FromIterator; use primitives::Blake2Hasher; use changes_trie::input::InputPair; use changes_trie::storage::InMemoryStorage; @@ -427,23 +430,28 @@ mod tests { fn drilldown_iterator_works() { let (config, storage) = prepare_for_drilldown(); let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>( - &config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 16 }, 16, &[42]); + &config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 16 }, 16, &[42]) + .and_then(Result::from_iter); assert_eq!(drilldown_result, Ok(vec![(8, 2), (8, 1), (6, 3), (3, 0)])); let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>( - &config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 2 }, 4, &[42]); + &config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 2 }, 4, &[42]) + .and_then(Result::from_iter); assert_eq!(drilldown_result, Ok(vec![])); let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>( - &config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 3 }, 4, &[42]); + &config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 3 }, 4, &[42]) + .and_then(Result::from_iter); assert_eq!(drilldown_result, Ok(vec![(3, 0)])); let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>( - &config, &storage, 7, &AnchorBlockId { hash: Default::default(), number: 8 }, 8, &[42]); + &config, &storage, 7, &AnchorBlockId { hash: Default::default(), number: 8 }, 8, &[42]) + .and_then(Result::from_iter); assert_eq!(drilldown_result, Ok(vec![(8, 2), (8, 1)])); let drilldown_result = key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>( - &config, &storage, 5, &AnchorBlockId { hash: Default::default(), number: 7 }, 8, &[42]); + &config, &storage, 5, &AnchorBlockId { hash: Default::default(), number: 7 }, 8, &[42]) + .and_then(Result::from_iter); assert_eq!(drilldown_result, Ok(vec![(6, 3)])); } @@ -453,7 +461,8 @@ mod tests { storage.clear_storage(); assert!(key_changes::<InMemoryStorage<Blake2Hasher>, Blake2Hasher>( - &config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 100 }, 1000, &[42]).is_err()); + &config, &storage, 0, &AnchorBlockId { hash: Default::default(), number: 100 }, 1000, &[42]) + .and_then(|i| i.collect::<Result<Vec<_>, _>>()).is_err()); } #[test] diff --git a/substrate/core/state-machine/src/changes_trie/mod.rs b/substrate/core/state-machine/src/changes_trie/mod.rs index 1d523854ee3..ffc43fb884f 100644 --- a/substrate/core/state-machine/src/changes_trie/mod.rs +++ b/substrate/core/state-machine/src/changes_trie/mod.rs @@ -44,7 +44,7 @@ mod storage; pub use self::storage::InMemoryStorage; pub use self::changes_iterator::{key_changes, key_changes_proof, key_changes_proof_check}; -pub use self::prune::prune; +pub use self::prune::{prune, oldest_non_pruned_trie}; use hash_db::Hasher; use heapsize::HeapSizeOf; diff --git a/substrate/core/state-machine/src/changes_trie/prune.rs b/substrate/core/state-machine/src/changes_trie/prune.rs index 98861051150..76c746cb72e 100644 --- a/substrate/core/state-machine/src/changes_trie/prune.rs +++ b/substrate/core/state-machine/src/changes_trie/prune.rs @@ -24,6 +24,22 @@ use trie_backend_essence::TrieBackendEssence; use changes_trie::{AnchorBlockId, Configuration, Storage}; use changes_trie::storage::TrieBackendAdapter; +/// Get number of oldest block for which changes trie is not pruned +/// given changes trie configuration, pruning parameter and number of +/// best finalized block. +pub fn oldest_non_pruned_trie( + config: &Configuration, + min_blocks_to_keep: u64, + best_finalized_block: u64, +) -> u64 { + let max_digest_interval = config.max_digest_interval(); + let max_digest_block = best_finalized_block - best_finalized_block % max_digest_interval; + match pruning_range(config, min_blocks_to_keep, max_digest_block) { + Some((_, last_pruned_block)) => last_pruned_block + 1, + None => 1, + } +} + /// Prune obslete changes tries. Puning happens at the same block, where highest /// level digest is created. Pruning guarantees to save changes tries for last /// `min_blocks_to_keep` blocks. We only prune changes tries at `max_digest_iterval` @@ -268,4 +284,23 @@ mod tests { assert_eq!(max_digest_intervals_to_keep(1024, 511), 2); assert_eq!(max_digest_intervals_to_keep(1024, 100), 10); } + + #[test] + fn oldest_non_pruned_trie_works() { + // when digests are not created at all + assert_eq!(oldest_non_pruned_trie(&config(0, 0), 100, 10), 1); + assert_eq!(oldest_non_pruned_trie(&config(0, 0), 100, 110), 11); + + // when only l1 digests are created + assert_eq!(oldest_non_pruned_trie(&config(100, 1), 100, 50), 1); + assert_eq!(oldest_non_pruned_trie(&config(100, 1), 100, 110), 1); + assert_eq!(oldest_non_pruned_trie(&config(100, 1), 100, 210), 101); + + // when l2 digests are created + assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 50), 1); + assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 110), 1); + assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 210), 1); + assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 10110), 1); + assert_eq!(oldest_non_pruned_trie(&config(100, 2), 100, 20110), 10001); + } } diff --git a/substrate/core/state-machine/src/lib.rs b/substrate/core/state-machine/src/lib.rs index bc0189d3987..e4fe89e2d2f 100644 --- a/substrate/core/state-machine/src/lib.rs +++ b/substrate/core/state-machine/src/lib.rs @@ -59,7 +59,8 @@ pub use changes_trie::{ RootsStorage as ChangesTrieRootsStorage, InMemoryStorage as InMemoryChangesTrieStorage, key_changes, key_changes_proof, key_changes_proof_check, - prune as prune_changes_tries}; + prune as prune_changes_tries, + oldest_non_pruned_trie as oldest_non_pruned_changes_trie}; pub use overlayed_changes::OverlayedChanges; pub use proving_backend::{create_proof_check_backend, create_proof_check_backend_storage}; pub use trie_backend_essence::{TrieBackendStorage, Storage}; -- GitLab