diff --git a/substrate/core/client/db/src/lib.rs b/substrate/core/client/db/src/lib.rs index 25f3521f61255b227fc32a7fb2a64e775e1de2ce..267d28729ba470a799472aaca11452b9a2c515b4 100644 --- a/substrate/core/client/db/src/lib.rs +++ b/substrate/core/client/db/src/lib.rs @@ -67,7 +67,8 @@ use hash_db::Hasher; use kvdb::{KeyValueDB, DBTransaction}; use trie::MemoryDB; use parking_lot::RwLock; -use primitives::{H256, AuthorityId, Blake2Hasher}; +use primitives::{H256, AuthorityId, Blake2Hasher, ChangesTrieConfiguration}; +use primitives::storage::well_known_keys; use runtime_primitives::{generic::BlockId, Justification}; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor, Zero, Digest, DigestItem}; use runtime_primitives::BuildStorage; @@ -80,6 +81,7 @@ use state_db::StateDb; pub use state_db::PruningMode; const CANONICALIZATION_DELAY: u64 = 256; +const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u64 = 32768; /// DB-backed patricia trie state, transaction type is an overlay of changes to commit. pub type DbState = state_machine::TrieBackend<Arc<state_machine::Storage<Blake2Hasher>>, Blake2Hasher>; @@ -360,9 +362,42 @@ impl state_machine::Storage<Blake2Hasher> for DbGenesisStorage { pub struct DbChangesTrieStorage<Block: BlockT> { db: Arc<KeyValueDB>, + min_blocks_to_keep: Option<u64>, _phantom: ::std::marker::PhantomData<Block>, } +impl<Block: BlockT> DbChangesTrieStorage<Block> { + /// Commit new changes trie. + pub fn commit(&self, tx: &mut DBTransaction, mut changes_trie: MemoryDB<Blake2Hasher>) { + for (key, (val, _)) in changes_trie.drain() { + tx.put(columns::CHANGES_TRIE, &key[..], &val); + } + } + + /// Prune obsolete changes tries. + pub fn prune(&self, config: Option<ChangesTrieConfiguration>, tx: &mut DBTransaction, block: NumberFor<Block>) { + // never prune on archive nodes + let min_blocks_to_keep = match self.min_blocks_to_keep { + Some(min_blocks_to_keep) => min_blocks_to_keep, + None => return, + }; + + // read configuration from the database. it is OK to do it here (without checking tx for + // modifications), since config can't change + let config = match config { + Some(config) => config, + None => return, + }; + + state_machine::prune_changes_tries( + &config, + &*self, + min_blocks_to_keep, + block.as_(), + |node| tx.delete(columns::CHANGES_TRIE, node.as_ref())); + } +} + impl<Block: BlockT> state_machine::ChangesTrieRootsStorage<Blake2Hasher> for DbChangesTrieStorage<Block> { fn root(&self, block: u64) -> Result<Option<H256>, String> { Ok(read_db::<Block>(&*self.db, columns::HASH_LOOKUP, columns::HEADER, BlockId::Number(As::sa(block))) @@ -389,7 +424,7 @@ impl<Block: BlockT> state_machine::ChangesTrieStorage<Blake2Hasher> for DbChange /// Otherwise, trie nodes are kept only from some recent blocks. pub struct Backend<Block: BlockT> { storage: Arc<StorageDb<Block>>, - tries_change_storage: DbChangesTrieStorage<Block>, + changes_tries_storage: DbChangesTrieStorage<Block>, blockchain: BlockchainDb<Block>, canonicalization_delay: u64, } @@ -418,6 +453,7 @@ impl<Block: BlockT> Backend<Block> { } fn from_kvdb(db: Arc<KeyValueDB>, pruning: PruningMode, canonicalization_delay: u64) -> Result<Self, client::error::Error> { + let is_archive_pruning = pruning.is_archive(); let blockchain = BlockchainDb::new(db.clone())?; let map_e = |e: state_db::Error<io::Error>| ::client::error::Error::from(format!("State database error: {:?}", e)); let state_db: StateDb<Block::Hash, H256> = StateDb::new(pruning, &StateMetaDb(&*db)).map_err(map_e)?; @@ -425,14 +461,15 @@ impl<Block: BlockT> Backend<Block> { db: db.clone(), state_db, }; - let tries_change_storage = DbChangesTrieStorage { + let changes_tries_storage = DbChangesTrieStorage { db, + min_blocks_to_keep: if is_archive_pruning { None } else { Some(MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR) }, _phantom: Default::default(), }; Ok(Backend { storage: Arc::new(storage_db), - tries_change_storage, + changes_tries_storage, blockchain, canonicalization_delay, }) @@ -487,7 +524,8 @@ impl<Block: BlockT> Backend<Block> { let f_num = f_header.number().clone(); if f_num.as_() > self.storage.state_db.best_canonical() { - if &meta.finalized_hash != f_header.parent_hash() { + let parent_hash = f_header.parent_hash().clone(); + if meta.finalized_hash != parent_hash { return Err(::client::error::ErrorKind::NonSequentialFinalization( format!("Last finalized {:?} not parent of {:?}", meta.finalized_hash, f_hash), @@ -497,6 +535,13 @@ impl<Block: BlockT> Backend<Block> { let commit = self.storage.state_db.canonicalize_block(&f_hash); apply_state_commit(transaction, commit); + + // read config from genesis, since it is readonly atm + use client::backend::Backend; + let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))? + .storage(well_known_keys::CHANGES_TRIE_CONFIG)? + .and_then(|v| Decode::decode(&mut &*v)); + self.changes_tries_storage.prune(changes_trie_config, transaction, f_num); } Ok(()) @@ -518,12 +563,6 @@ fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitS } } -fn apply_changes_trie_commit(transaction: &mut DBTransaction, mut commit: MemoryDB<Blake2Hasher>) { - for (key, (val, _)) in commit.drain() { - transaction.put(columns::CHANGES_TRIE, &key[..], &val); - } -} - impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT { type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>; type Blockchain = BlockchainDb<Block>; @@ -618,7 +657,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset) .map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?; apply_state_commit(&mut transaction, commit); - apply_changes_trie_commit(&mut transaction, operation.changes_trie_updates); + self.changes_tries_storage.commit(&mut transaction, operation.changes_trie_updates); let finalized = match pending_block.leaf_state { NewBlockState::Final => true, @@ -679,7 +718,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe } fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage> { - Some(&self.tries_change_storage) + Some(&self.changes_tries_storage) } fn revert(&self, n: NumberFor<Block>) -> Result<NumberFor<Block>, client::error::Error> { @@ -1062,7 +1101,7 @@ mod tests { let check_changes = |backend: &Backend<Block>, block: u64, changes: Vec<(Vec<u8>, Vec<u8>)>| { let (changes_root, mut changes_trie_update) = prepare_changes(changes); - assert_eq!(backend.tries_change_storage.root(block), Ok(Some(changes_root))); + assert_eq!(backend.changes_tries_storage.root(block), Ok(Some(changes_root))); for (key, (val, _)) in changes_trie_update.drain() { assert_eq!(backend.changes_trie_storage().unwrap().get(&key), Ok(Some(val))); @@ -1086,6 +1125,82 @@ mod tests { check_changes(&backend, 2, changes2); } + #[test] + fn changes_tries_are_pruned_on_finalization() { + let mut backend = Backend::<Block>::new_test(1000, 100); + backend.changes_tries_storage.min_blocks_to_keep = Some(8); + let config = ChangesTrieConfiguration { + digest_interval: 2, + digest_levels: 2, + }; + + // insert some blocks + let block0 = insert_header(&backend, 0, Default::default(), vec![(b"key_at_0".to_vec(), b"val_at_0".to_vec())], Default::default()); + let block1 = insert_header(&backend, 1, block0, vec![(b"key_at_1".to_vec(), b"val_at_1".to_vec())], Default::default()); + let block2 = insert_header(&backend, 2, block1, vec![(b"key_at_2".to_vec(), b"val_at_2".to_vec())], Default::default()); + let block3 = insert_header(&backend, 3, block2, vec![(b"key_at_3".to_vec(), b"val_at_3".to_vec())], Default::default()); + let block4 = insert_header(&backend, 4, block3, vec![(b"key_at_4".to_vec(), b"val_at_4".to_vec())], Default::default()); + let block5 = insert_header(&backend, 5, block4, vec![(b"key_at_5".to_vec(), b"val_at_5".to_vec())], Default::default()); + let block6 = insert_header(&backend, 6, block5, vec![(b"key_at_6".to_vec(), b"val_at_6".to_vec())], Default::default()); + let block7 = insert_header(&backend, 7, block6, vec![(b"key_at_7".to_vec(), b"val_at_7".to_vec())], Default::default()); + let block8 = insert_header(&backend, 8, block7, vec![(b"key_at_8".to_vec(), b"val_at_8".to_vec())], Default::default()); + let block9 = insert_header(&backend, 9, block8, vec![(b"key_at_9".to_vec(), b"val_at_9".to_vec())], Default::default()); + let block10 = insert_header(&backend, 10, block9, vec![(b"key_at_10".to_vec(), b"val_at_10".to_vec())], Default::default()); + let block11 = insert_header(&backend, 11, block10, vec![(b"key_at_11".to_vec(), b"val_at_11".to_vec())], Default::default()); + let _ = insert_header(&backend, 12, block11, vec![(b"key_at_12".to_vec(), b"val_at_12".to_vec())], Default::default()); + + // check that roots of all tries are in the columns::CHANGES_TRIE + fn read_changes_trie_root(backend: &Backend<Block>, num: u64) -> H256 { + backend.blockchain().header(BlockId::Number(num)).unwrap().unwrap().digest().logs().iter() + .find(|i| i.as_changes_trie_root().is_some()).unwrap().as_changes_trie_root().unwrap().clone() + } + let root1 = read_changes_trie_root(&backend, 1); assert_eq!(backend.changes_tries_storage.root(1).unwrap(), Some(root1)); + let root2 = read_changes_trie_root(&backend, 2); assert_eq!(backend.changes_tries_storage.root(2).unwrap(), Some(root2)); + let root3 = read_changes_trie_root(&backend, 3); assert_eq!(backend.changes_tries_storage.root(3).unwrap(), Some(root3)); + let root4 = read_changes_trie_root(&backend, 4); assert_eq!(backend.changes_tries_storage.root(4).unwrap(), Some(root4)); + let root5 = read_changes_trie_root(&backend, 5); assert_eq!(backend.changes_tries_storage.root(5).unwrap(), Some(root5)); + let root6 = read_changes_trie_root(&backend, 6); assert_eq!(backend.changes_tries_storage.root(6).unwrap(), Some(root6)); + let root7 = read_changes_trie_root(&backend, 7); assert_eq!(backend.changes_tries_storage.root(7).unwrap(), Some(root7)); + let root8 = read_changes_trie_root(&backend, 8); assert_eq!(backend.changes_tries_storage.root(8).unwrap(), Some(root8)); + let root9 = read_changes_trie_root(&backend, 9); assert_eq!(backend.changes_tries_storage.root(9).unwrap(), Some(root9)); + let root10 = read_changes_trie_root(&backend, 10); assert_eq!(backend.changes_tries_storage.root(10).unwrap(), Some(root10)); + let root11 = read_changes_trie_root(&backend, 11); assert_eq!(backend.changes_tries_storage.root(11).unwrap(), Some(root11)); + let root12 = read_changes_trie_root(&backend, 12); assert_eq!(backend.changes_tries_storage.root(12).unwrap(), Some(root12)); + + // now simulate finalization of block#12, causing prune of tries at #1..#4 + let mut tx = DBTransaction::new(); + backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 12); + backend.storage.db.write(tx).unwrap(); + assert!(backend.changes_tries_storage.get(&root1).unwrap().is_none()); + assert!(backend.changes_tries_storage.get(&root2).unwrap().is_none()); + assert!(backend.changes_tries_storage.get(&root3).unwrap().is_none()); + assert!(backend.changes_tries_storage.get(&root4).unwrap().is_none()); + assert!(backend.changes_tries_storage.get(&root5).unwrap().is_some()); + assert!(backend.changes_tries_storage.get(&root6).unwrap().is_some()); + assert!(backend.changes_tries_storage.get(&root7).unwrap().is_some()); + assert!(backend.changes_tries_storage.get(&root8).unwrap().is_some()); + + // now simulate finalization of block#16, causing prune of tries at #5..#8 + let mut tx = DBTransaction::new(); + backend.changes_tries_storage.prune(Some(config.clone()), &mut tx, 16); + backend.storage.db.write(tx).unwrap(); + assert!(backend.changes_tries_storage.get(&root5).unwrap().is_none()); + assert!(backend.changes_tries_storage.get(&root6).unwrap().is_none()); + assert!(backend.changes_tries_storage.get(&root7).unwrap().is_none()); + assert!(backend.changes_tries_storage.get(&root8).unwrap().is_none()); + + // now "change" pruning mode to archive && simulate finalization of block#20 + // => no changes tries are pruned, because we never prune in archive mode + backend.changes_tries_storage.min_blocks_to_keep = None; + let mut tx = DBTransaction::new(); + backend.changes_tries_storage.prune(Some(config), &mut tx, 20); + backend.storage.db.write(tx).unwrap(); + assert!(backend.changes_tries_storage.get(&root9).unwrap().is_some()); + assert!(backend.changes_tries_storage.get(&root10).unwrap().is_some()); + assert!(backend.changes_tries_storage.get(&root11).unwrap().is_some()); + assert!(backend.changes_tries_storage.get(&root12).unwrap().is_some()); + } + #[test] fn tree_route_works() { let backend = Backend::<Block>::new_test(1000, 100); diff --git a/substrate/core/primitives/src/changes_trie.rs b/substrate/core/primitives/src/changes_trie.rs index 4bf9710c90eb375e5a1aa914164dd9159921474c..1bfde315ee9816269689c0a970c94c170f2274a3 100644 --- a/substrate/core/primitives/src/changes_trie.rs +++ b/substrate/core/primitives/src/changes_trie.rs @@ -28,3 +28,100 @@ pub struct ChangesTrieConfiguration { /// 2 means that every digest_interval^2 there will be a level2-digest, and so on. pub digest_levels: u32, } + +impl ChangesTrieConfiguration { + /// Is digest build enabled? + pub fn is_digest_build_enabled(&self) -> bool { + self.digest_interval > 1 && self.digest_levels > 0 + } + + /// Do we need to build digest at given block? + pub fn is_digest_build_required_at_block(&self, block: u64) -> bool { + block != 0 + && self.is_digest_build_enabled() + && block % self.digest_interval == 0 + } + + /// Returns Some if digest must be built at given block number. + /// The tuple is: + /// ( + /// digest level + /// digest interval (in blocks) + /// step between blocks we're interested in when digest is built + /// ) + pub fn digest_level_at_block(&self, block: u64) -> Option<(u32, u64, u64)> { + if !self.is_digest_build_required_at_block(block) { + return None; + } + + let mut digest_interval = self.digest_interval; + let mut current_level = 1u32; + let mut digest_step = 1u64; + while current_level < self.digest_levels { + let new_digest_interval = match digest_interval.checked_mul(self.digest_interval) { + Some(new_digest_interval) if block % new_digest_interval == 0 => new_digest_interval, + _ => break, + }; + + digest_step = digest_interval; + digest_interval = new_digest_interval; + current_level = current_level + 1; + } + + Some(( + current_level, + digest_interval, + digest_step, + )) + } +} + +#[cfg(test)] +mod tests { + use super::ChangesTrieConfiguration; + + fn config(interval: u64, levels: u32) -> ChangesTrieConfiguration { + ChangesTrieConfiguration { + digest_interval: interval, + digest_levels: levels, + } + } + + #[test] + fn is_digest_build_enabled_works() { + assert!(!config(0, 100).is_digest_build_enabled()); + assert!(!config(1, 100).is_digest_build_enabled()); + assert!(config(2, 100).is_digest_build_enabled()); + assert!(!config(100, 0).is_digest_build_enabled()); + assert!(config(100, 1).is_digest_build_enabled()); + } + + #[test] + fn is_digest_build_required_at_block_works() { + assert!(!config(8, 4).is_digest_build_required_at_block(0)); + assert!(!config(8, 4).is_digest_build_required_at_block(1)); + assert!(!config(8, 4).is_digest_build_required_at_block(2)); + assert!(!config(8, 4).is_digest_build_required_at_block(4)); + assert!(config(8, 4).is_digest_build_required_at_block(8)); + assert!(!config(8, 4).is_digest_build_required_at_block(9)); + assert!(config(8, 4).is_digest_build_required_at_block(64)); + assert!(config(8, 4).is_digest_build_required_at_block(64)); + assert!(config(8, 4).is_digest_build_required_at_block(512)); + assert!(config(8, 4).is_digest_build_required_at_block(4096)); + assert!(!config(8, 4).is_digest_build_required_at_block(4103)); + assert!(config(8, 4).is_digest_build_required_at_block(4104)); + assert!(!config(8, 4).is_digest_build_required_at_block(4108)); + } + + #[test] + fn digest_level_at_block_works() { + assert_eq!(config(8, 4).digest_level_at_block(0), None); + assert_eq!(config(8, 4).digest_level_at_block(7), None); + assert_eq!(config(8, 4).digest_level_at_block(63), None); + assert_eq!(config(8, 4).digest_level_at_block(8), Some((1, 8, 1))); + assert_eq!(config(8, 4).digest_level_at_block(64), Some((2, 64, 8))); + assert_eq!(config(8, 4).digest_level_at_block(512), Some((3, 512, 64))); + assert_eq!(config(8, 4).digest_level_at_block(4096), Some((4, 4096, 512))); + assert_eq!(config(8, 4).digest_level_at_block(4112), Some((1, 8, 1))); + } +} diff --git a/substrate/core/state-db/src/lib.rs b/substrate/core/state-db/src/lib.rs index 4d88bf7387f767b7aa9bd737f1da369e0e8e2e8a..598f0c47059e11f84e471258f4b763f5385726a5 100644 --- a/substrate/core/state-db/src/lib.rs +++ b/substrate/core/state-db/src/lib.rs @@ -142,6 +142,15 @@ impl PruningMode { max_mem: None, }) } + + /// Is this an archive (either ArchiveAll or ArchiveCanonical) pruning mode? + pub fn is_archive(&self) -> bool { + match *self { + PruningMode::ArchiveAll | PruningMode::ArchiveCanonical => true, + PruningMode::Constrained(_) => false + } + } + } impl Default for PruningMode { diff --git a/substrate/core/state-machine/src/backend.rs b/substrate/core/state-machine/src/backend.rs index 0c5af265c2887db27a76b021c77cf5fe5e5745ea..412fe5362832ae3d6939dc1875d4305bf2e76b0d 100644 --- a/substrate/core/state-machine/src/backend.rs +++ b/substrate/core/state-machine/src/backend.rs @@ -189,8 +189,7 @@ pub(crate) fn insert_into_memory_db<H, I>(mdb: &mut MemoryDB<H>, input: I) -> Op where H: Hasher, H::Out: HeapSizeOf, - - I: Iterator<Item=(Vec<u8>, Vec<u8>)>, + I: IntoIterator<Item=(Vec<u8>, Vec<u8>)>, { let mut root = <H as Hasher>::Out::default(); { diff --git a/substrate/core/state-machine/src/changes_trie/build_iterator.rs b/substrate/core/state-machine/src/changes_trie/build_iterator.rs index c9c854dd7af400cdf8ae19de6c72d73b8bce5e29..6cc86b294bee4963f22d049f5185182bf2469c2c 100644 --- a/substrate/core/state-machine/src/changes_trie/build_iterator.rs +++ b/substrate/core/state-machine/src/changes_trie/build_iterator.rs @@ -22,31 +22,12 @@ use changes_trie::Configuration; /// Returns iterator of OTHER blocks that are required for inclusion into /// changes trie of given block. pub fn digest_build_iterator(config: &Configuration, block: u64) -> DigestBuildIterator { - // digest is never built in these cases - if block == 0 || config.digest_interval <= 1 || config.digest_levels == 0 { - return DigestBuildIterator::empty(); - } - - // digest is built every digest_multiplier blocks - let mut digest_interval = config.digest_interval; - if block % digest_interval != 0 { - return DigestBuildIterator::empty(); - } - - // we have checked that the block is at least level1-digest - // => try to find highest digest level for inclusion - let mut current_level = 1u32; - let mut digest_step = 1u64; - while current_level < config.digest_levels { - let new_digest_interval = match digest_interval.checked_mul(config.digest_interval) { - Some(new_digest_interval) if block % new_digest_interval == 0 => new_digest_interval, - _ => break, - }; - - digest_step = digest_interval; - digest_interval = new_digest_interval; - current_level = current_level + 1; - } + // prepare digest build parameters + let (_, _, digest_step) = match config.digest_level_at_block(block) { + Some((current_level, digest_interval, digest_step)) => + (current_level, digest_interval, digest_step), + None => return DigestBuildIterator::empty(), + }; DigestBuildIterator::new(block, config.digest_interval, digest_step) } diff --git a/substrate/core/state-machine/src/changes_trie/mod.rs b/substrate/core/state-machine/src/changes_trie/mod.rs index 8a56897aa51ceb526a9d3a875c0ad620f4480023..80f3bd55983dad45b2803e0587627c2c49ca4ee9 100644 --- a/substrate/core/state-machine/src/changes_trie/mod.rs +++ b/substrate/core/state-machine/src/changes_trie/mod.rs @@ -36,10 +36,12 @@ mod build; mod build_iterator; mod changes_iterator; mod input; +mod prune; mod storage; pub use self::storage::InMemoryStorage; pub use self::changes_iterator::{key_changes, key_changes_proof, key_changes_proof_check}; +pub use self::prune::prune; use hash_db::Hasher; use heapsize::HeapSizeOf; diff --git a/substrate/core/state-machine/src/changes_trie/prune.rs b/substrate/core/state-machine/src/changes_trie/prune.rs new file mode 100644 index 0000000000000000000000000000000000000000..8168a0771d31579970f1f8cadb05473ea99bfe5c --- /dev/null +++ b/substrate/core/state-machine/src/changes_trie/prune.rs @@ -0,0 +1,244 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +//! Changes trie pruning-related functions. + +use hash_db::Hasher; +use heapsize::HeapSizeOf; +use substrate_trie::Recorder; +use proving_backend::ProvingBackendEssence; +use trie_backend_essence::TrieBackendEssence; +use changes_trie::{Configuration, Storage}; +use changes_trie::storage::TrieBackendAdapter; + +/// Prune obslete changes tries. Puning happens at the same block, where highest +/// level digest is created. Pruning guarantees to save changes tries for last +/// `min_blocks_to_keep` blocks. We only prune changes tries at `max_digest_iterval` +/// ranges. +/// Returns MemoryDB that contains all deleted changes tries nodes. +pub fn prune<S: Storage<H>, H: Hasher, F: FnMut(H::Out)>( + config: &Configuration, + storage: &S, + min_blocks_to_keep: u64, + current_block: u64, + mut remove_trie_node: F, +) + where + H::Out: HeapSizeOf, +{ + // we only CAN prune at block where max-level-digest is created + let digest_interval = match config.digest_level_at_block(current_block) { + Some((digest_level, digest_interval, _)) if digest_level == config.digest_levels => + digest_interval, + _ => return, + }; + + // select range for pruning + let (first, last) = match pruning_range(min_blocks_to_keep, current_block, digest_interval) { + Some((first, last)) => (first, last), + None => return, + }; + + // delete changes trie for every block in range + // TODO: limit `max_digest_interval` so that this cycle won't involve huge ranges + for block in first..last+1 { + let root = match storage.root(block) { + Ok(Some(root)) => root, + Ok(None) => continue, + Err(error) => { + // try to delete other tries + warn!(target: "trie", "Failed to read changes trie root from DB: {}", error); + continue; + }, + }; + + // enumerate all changes trie' keys, recording all nodes that have been 'touched' + // (effectively - all changes trie nodes) + let mut proof_recorder: Recorder<H::Out> = Default::default(); + { + let mut trie = ProvingBackendEssence::<_, H> { + backend: &TrieBackendEssence::new(TrieBackendAdapter::new(storage), root), + proof_recorder: &mut proof_recorder, + }; + trie.record_all_keys(); + } + + // all nodes of this changes trie should be pruned + remove_trie_node(root); + for node in proof_recorder.drain().into_iter().map(|n| n.hash) { + remove_trie_node(node); + } + } +} + +/// Select blocks range (inclusive from both ends) for pruning changes tries in. +fn pruning_range(min_blocks_to_keep: u64, block: u64, max_digest_interval: u64) -> Option<(u64, u64)> { + // compute maximal number of high-level digests to keep + let max_digest_intervals_to_keep = max_digest_intervals_to_keep(min_blocks_to_keep, max_digest_interval); + + // number of blocks BEFORE current block where changes tries are not pruned + let blocks_to_keep = max_digest_intervals_to_keep.checked_mul(max_digest_interval); + + // last block for which changes trie is pruned + let last_block_to_prune = blocks_to_keep.and_then(|b| block.checked_sub(b)); + let first_block_to_prune = last_block_to_prune.clone().and_then(|b| b.checked_sub(max_digest_interval)); + + last_block_to_prune + .and_then(|last| first_block_to_prune.map(|first| (first + 1, last))) +} + +/// Select pruning delay for the changes tries. To make sure we could build a changes +/// trie at block B, we need an access to previous: +/// max_digest_interval = config.digest_interval ^ config.digest_levels +/// blocks. So we can only prune blocks that are earlier than B - max_digest_interval. +/// The pruning_delay stands for number of max_digest_interval-s that we want to keep: +/// 0 or 1: means that only last changes trie is guaranteed to exists; +/// 2: the last chnages trie + previous changes trie +/// ... +fn max_digest_intervals_to_keep(min_blocks_to_keep: u64, max_digest_interval: u64) -> u64 { + // config.digest_level_at_block ensures that it is not zero + debug_assert!(max_digest_interval != 0); + + let max_digest_intervals_to_keep = min_blocks_to_keep / max_digest_interval; + if max_digest_intervals_to_keep == 0 { + 1 + } else { + max_digest_intervals_to_keep + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use trie::MemoryDB; + use primitives::Blake2Hasher; + use backend::insert_into_memory_db; + use changes_trie::storage::InMemoryStorage; + use super::*; + + fn prune_by_collect<S: Storage<H>, H: Hasher>( + config: &Configuration, + storage: &S, + min_blocks_to_keep: u64, + current_block: u64, + ) -> HashSet<H::Out> + where + H::Out: HeapSizeOf, + { + let mut pruned_trie_nodes = HashSet::new(); + prune(config, storage, min_blocks_to_keep, current_block, + |node| { pruned_trie_nodes.insert(node); }); + pruned_trie_nodes + } + + + #[test] + fn prune_works() { + fn prepare_storage() -> InMemoryStorage<Blake2Hasher> { + let mut mdb1 = MemoryDB::<Blake2Hasher>::default(); + let root1 = insert_into_memory_db::<Blake2Hasher, _>(&mut mdb1, vec![(vec![10], vec![20])]).unwrap(); + let mut mdb2 = MemoryDB::<Blake2Hasher>::default(); + let root2 = insert_into_memory_db::<Blake2Hasher, _>(&mut mdb2, vec![(vec![11], vec![21]), (vec![12], vec![22])]).unwrap(); + let mut mdb3 = MemoryDB::<Blake2Hasher>::default(); + let root3 = insert_into_memory_db::<Blake2Hasher, _>(&mut mdb3, vec![(vec![13], vec![23]), (vec![14], vec![24])]).unwrap(); + let mut mdb4 = MemoryDB::<Blake2Hasher>::default(); + let root4 = insert_into_memory_db::<Blake2Hasher, _>(&mut mdb4, vec![(vec![15], vec![25])]).unwrap(); + let storage = InMemoryStorage::new(); + storage.insert(65, root1, mdb1); + storage.insert(66, root2, mdb2); + storage.insert(67, root3, mdb3); + storage.insert(68, root4, mdb4); + + storage + } + + // l1-digest is created every 2 blocks + // l2-digest is created every 4 blocks + // we do not want to keep any additional changes tries + // => only one l2-digest is saved AND it is pruned once next is created + let config = Configuration { digest_interval: 2, digest_levels: 2 }; + let storage = prepare_storage(); + assert!(prune_by_collect(&config, &storage, 0, 69).is_empty()); + assert!(prune_by_collect(&config, &storage, 0, 70).is_empty()); + assert!(prune_by_collect(&config, &storage, 0, 71).is_empty()); + let non_empty = prune_by_collect(&config, &storage, 0, 72); + assert!(!non_empty.is_empty()); + storage.remove_from_storage(&non_empty); + assert!(storage.into_mdb().drain().is_empty()); + + // l1-digest is created every 2 blocks + // l2-digest is created every 4 blocks + // we want keep 1 additional changes tries + let config = Configuration { digest_interval: 2, digest_levels: 2 }; + let storage = prepare_storage(); + assert!(prune_by_collect(&config, &storage, 8, 69).is_empty()); + assert!(prune_by_collect(&config, &storage, 8, 70).is_empty()); + assert!(prune_by_collect(&config, &storage, 8, 71).is_empty()); + assert!(prune_by_collect(&config, &storage, 8, 72).is_empty()); + assert!(prune_by_collect(&config, &storage, 8, 73).is_empty()); + assert!(prune_by_collect(&config, &storage, 8, 74).is_empty()); + assert!(prune_by_collect(&config, &storage, 8, 75).is_empty()); + let non_empty = prune_by_collect(&config, &storage, 8, 76); + assert!(!non_empty.is_empty()); + storage.remove_from_storage(&non_empty); + assert!(storage.into_mdb().drain().is_empty()); + + // l1-digest is created every 2 blocks + // we want keep 2 additional changes tries + let config = Configuration { digest_interval: 2, digest_levels: 1 }; + let storage = prepare_storage(); + assert!(prune_by_collect(&config, &storage, 4, 69).is_empty()); + let non_empty = prune_by_collect(&config, &storage, 4, 70); + assert!(!non_empty.is_empty()); + storage.remove_from_storage(&non_empty); + assert!(prune_by_collect(&config, &storage, 4, 71).is_empty()); + let non_empty = prune_by_collect(&config, &storage, 4, 72); + assert!(!non_empty.is_empty()); + storage.remove_from_storage(&non_empty); + assert!(storage.into_mdb().drain().is_empty()); + } + + #[test] + fn pruning_range_works() { + assert_eq!(pruning_range(2, 0, 100), None); + assert_eq!(pruning_range(2, 30, 100), None); + assert_eq!(pruning_range(::std::u64::MAX, 1024, 1), None); + assert_eq!(pruning_range(1, 1024, ::std::u64::MAX), None); + assert_eq!(pruning_range(::std::u64::MAX, 1024, ::std::u64::MAX), None); + assert_eq!(pruning_range(1024, 512, 512), None); + assert_eq!(pruning_range(1024, 1024, 512), None); + + // when we do not want to keep any highest-level-digests + // (system forces to keep at least one) + assert_eq!(pruning_range(0, 32, 16), Some((1, 16))); + assert_eq!(pruning_range(0, 64, 16), Some((33, 48))); + // when we want to keep 1 (last) highest-level-digest + assert_eq!(pruning_range(16, 32, 16), Some((1, 16))); + assert_eq!(pruning_range(16, 64, 16), Some((33, 48))); + // when we want to keep 1 (last) + 1 additional level digests + assert_eq!(pruning_range(1024, 1536, 512), Some((1, 512))); + assert_eq!(pruning_range(1024, 2048, 512), Some((513, 1024))); + } + + #[test] + fn max_digest_intervals_to_keep_works() { + assert_eq!(max_digest_intervals_to_keep(1024, 1025), 1); + assert_eq!(max_digest_intervals_to_keep(1024, 1023), 1); + assert_eq!(max_digest_intervals_to_keep(1024, 512), 2); + assert_eq!(max_digest_intervals_to_keep(1024, 511), 2); + assert_eq!(max_digest_intervals_to_keep(1024, 100), 10); + } +} diff --git a/substrate/core/state-machine/src/changes_trie/storage.rs b/substrate/core/state-machine/src/changes_trie/storage.rs index 2d91a5be4c62cbcfc46e067b536be6820af450f3..1cdd03841e1cf1e0fbf4d6e05818c3e18bde408c 100644 --- a/substrate/core/state-machine/src/changes_trie/storage.rs +++ b/substrate/core/state-machine/src/changes_trie/storage.rs @@ -25,6 +25,8 @@ use parking_lot::RwLock; use changes_trie::{RootsStorage, Storage}; use trie_backend_essence::TrieBackendStorage; +#[cfg(test)] +use std::collections::HashSet; #[cfg(test)] use backend::insert_into_memory_db; #[cfg(test)] @@ -86,6 +88,19 @@ impl<H: Hasher> InMemoryStorage<H> where H::Out: HeapSizeOf { self.data.write().mdb = MemoryDB::default(); // use new to be more correct } + #[cfg(test)] + pub fn remove_from_storage(&self, keys: &HashSet<H::Out>) { + let mut data = self.data.write(); + for key in keys { + data.mdb.remove_and_purge(key); + } + } + + #[cfg(test)] + pub fn into_mdb(self) -> MemoryDB<H> { + self.data.into_inner().mdb + } + /// Insert changes trie for given block. pub fn insert(&self, block: u64, changes_trie_root: H::Out, trie: MemoryDB<H>) { let mut data = self.data.write(); diff --git a/substrate/core/state-machine/src/lib.rs b/substrate/core/state-machine/src/lib.rs index bba587715eb15a2a2b589ddb08bd9d8a96d0b518..50a8a83e6f34dc4ff494eed9c2f7b16ec442bbf9 100644 --- a/substrate/core/state-machine/src/lib.rs +++ b/substrate/core/state-machine/src/lib.rs @@ -58,7 +58,8 @@ pub use backend::Backend; pub use changes_trie::{Storage as ChangesTrieStorage, RootsStorage as ChangesTrieRootsStorage, InMemoryStorage as InMemoryChangesTrieStorage, - key_changes, key_changes_proof, key_changes_proof_check}; + key_changes, key_changes_proof, key_changes_proof_check, + prune as prune_changes_tries}; pub use overlayed_changes::OverlayedChanges; pub use trie_backend_essence::Storage; pub use trie_backend::TrieBackend; diff --git a/substrate/core/state-machine/src/proving_backend.rs b/substrate/core/state-machine/src/proving_backend.rs index baf7bd7fcf582785f70bfafd48b5a6b1db735281..d17eed7bc8d047b5c47d882f65501a50f76da084 100644 --- a/substrate/core/state-machine/src/proving_backend.rs +++ b/substrate/core/state-machine/src/proving_backend.rs @@ -20,7 +20,7 @@ use std::cell::RefCell; use hash_db::Hasher; use heapsize::HeapSizeOf; use hash_db::HashDB; -use trie::{TrieDB, Trie, Recorder, MemoryDB}; +use trie::{TrieDB, Trie, Recorder, MemoryDB, TrieError}; use trie_backend::TrieBackend; use trie_backend_essence::{Ephemeral, TrieBackendEssence, TrieBackendStorage}; use {Error, ExecutionError, Backend}; @@ -37,7 +37,6 @@ impl<'a, S, H> ProvingBackendEssence<'a, S, H> S: TrieBackendStorage<H>, H: Hasher, H::Out: HeapSizeOf, - { pub fn storage(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>, String> { let mut read_overlay = MemoryDB::default(); @@ -53,6 +52,36 @@ impl<'a, S, H> ProvingBackendEssence<'a, S, H> .map(|x| x.map(|val| val.to_vec())) .map_err(map_e) } + + pub fn record_all_keys(&mut self) { + let mut read_overlay = MemoryDB::default(); + let eph = Ephemeral::new( + self.backend.backend_storage(), + &mut read_overlay, + ); + + let mut iter = move || -> Result<(), Box<TrieError<H::Out>>> { + let root = self.backend.root(); + let trie = TrieDB::<H>::new(&eph, root)?; + let iter = trie.iter()?; + + for x in iter { + let (key, _) = x?; + + // there's currently no API like iter_with() + // => use iter to enumerate all keys AND lookup each + // key using get_with + trie.get_with(&key, &mut *self.proof_recorder) + .map(|x| x.map(|val| val.to_vec()))?; + } + + Ok(()) + }; + + if let Err(e) = iter() { + debug!(target: "trie", "Error while recording all keys: {}", e); + } + } } /// Patricia trie-based backend which also tracks all touched storage trie values.