diff --git a/substrate/core/client/db/src/lib.rs b/substrate/core/client/db/src/lib.rs index a85d708392b86ab979fa47981966599872c250ec..5e6b336db3043f10d35fb4050a429586e0a19c50 100644 --- a/substrate/core/client/db/src/lib.rs +++ b/substrate/core/client/db/src/lib.rs @@ -625,119 +625,22 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> { }; trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash); - let commit = self.storage.state_db.canonicalize_block(&hash); + let commit = self.storage.state_db.canonicalize_block(&hash) + .map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?; apply_state_commit(transaction, commit); }; Ok(()) } - // write stuff to a transaction after a new block is finalized. - // this canonicalizes finalized blocks. Fails if called with a block which - // was not a child of the last finalized block. - fn note_finalized( - &self, - transaction: &mut DBTransaction, - f_header: &Block::Header, - f_hash: Block::Hash, - ) -> Result<(), client::error::Error> where - Block: BlockT<Hash=H256>, - { - let f_num = f_header.number().clone(); - - if f_num.as_() > self.storage.state_db.best_canonical() { - let parent_hash = f_header.parent_hash().clone(); - - let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone()); - transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key); - - let commit = self.storage.state_db.canonicalize_block(&f_hash); - apply_state_commit(transaction, commit); - - // read config from genesis, since it is readonly atm - use client::backend::Backend; - let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))? - .storage(well_known_keys::CHANGES_TRIE_CONFIG)? - .and_then(|v| Decode::decode(&mut &*v)); - self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num); - } - - Ok(()) - } -} - -fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet<H256>) { - for (key, val) in commit.data.inserted.into_iter() { - transaction.put(columns::STATE, &key[..], &val); - } - for key in commit.data.deleted.into_iter() { - transaction.delete(columns::STATE, &key[..]); - } - for (key, val) in commit.meta.inserted.into_iter() { - transaction.put(columns::STATE_META, &key[..], &val); - } - for key in commit.meta.deleted.into_iter() { - transaction.delete(columns::STATE_META, &key[..]); - } -} - -impl<Block> client::backend::AuxStore for Backend<Block> where Block: BlockT<Hash=H256> { - fn insert_aux< - 'a, - 'b: 'a, - 'c: 'a, - I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>, - D: IntoIterator<Item=&'a &'b [u8]>, - >(&self, insert: I, delete: D) -> client::error::Result<()> { - let mut transaction = DBTransaction::new(); - for (k, v) in insert { - transaction.put(columns::AUX, k, v); - } - for k in delete { - transaction.delete(columns::AUX, k); - } - self.storage.db.write(transaction).map_err(db_err)?; - Ok(()) - } - - fn get_aux(&self, key: &[u8]) -> Result<Option<Vec<u8>>, client::error::Error> { - Ok(self.storage.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)?) - } -} - -impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT<Hash=H256> { - type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>; - type Blockchain = BlockchainDb<Block>; - type State = CachingState<Blake2Hasher, DbState, Block>; - type ChangesTrieStorage = DbChangesTrieStorage<Block>; - - fn begin_operation(&self) -> Result<Self::BlockImportOperation, client::error::Error> { - let old_state = self.state_at(BlockId::Hash(Default::default()))?; - Ok(BlockImportOperation { - pending_block: None, - old_state, - db_updates: MemoryDB::default(), - storage_updates: Default::default(), - changes_trie_updates: MemoryDB::default(), - aux_ops: Vec::new(), - finalized_blocks: Vec::new(), - }) - } - - fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId<Block>) -> Result<(), client::error::Error> { - operation.old_state = self.state_at(block)?; - Ok(()) - } - - fn commit_operation(&self, mut operation: Self::BlockImportOperation) + fn try_commit_operation(&self, mut operation: BlockImportOperation<Block, Blake2Hasher>) -> Result<(), client::error::Error> { let mut transaction = DBTransaction::new(); operation.apply_aux(&mut transaction); + let mut meta_updates = Vec::new(); if !operation.finalized_blocks.is_empty() { - let mut meta_updates = Vec::new(); - let mut last_finalized_hash = self.blockchain.meta.read().finalized_hash; for (block, justification) in operation.finalized_blocks { let block_hash = self.blockchain.expect_block_hash_from_id(&block)?; @@ -752,10 +655,6 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe )?); last_finalized_hash = block_hash; } - - for (hash, number, is_best, is_finalized) in meta_updates { - self.blockchain.update_meta(hash, number, is_best, is_finalized); - } } if let Some(pending_block) = operation.pending_block { @@ -851,23 +750,28 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset) .map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?; apply_state_commit(&mut transaction, commit); - self.changes_tries_storage.commit(&mut transaction, operation.changes_trie_updates); let finalized = match pending_block.leaf_state { NewBlockState::Final => true, _ => false, }; + let header = &pending_block.header; + let is_best = pending_block.leaf_state.is_best(); + let changes_trie_updates = operation.changes_trie_updates; + + + self.changes_tries_storage.commit(&mut transaction, changes_trie_updates); + if finalized { // TODO: ensure best chain contains this block. - self.ensure_sequential_finalization(&pending_block.header, None)?; - self.note_finalized(&mut transaction, &pending_block.header, hash)?; + self.ensure_sequential_finalization(header, None)?; + self.note_finalized(&mut transaction, header, hash)?; } else { // canonicalize blocks which are old enough, regardless of finality. - self.force_delayed_canonicalize(&mut transaction, hash, *pending_block.header.number())? + self.force_delayed_canonicalize(&mut transaction, hash, *header.number())? } - let is_best = pending_block.leaf_state.is_best(); debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, is_best); { @@ -886,10 +790,14 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe drop(leaves); } + for (hash, number, is_best, is_finalized) in meta_updates { + self.blockchain.update_meta(hash, number, is_best, is_finalized); + } + self.blockchain.update_meta( hash.clone(), number.clone(), - pending_block.leaf_state.is_best(), + is_best, finalized, ); @@ -902,25 +810,155 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe Some(number), || is_best ); + } else { + // No pending block, just write the transaction and apply meta changes + self.storage.db.write(transaction).map_err(db_err)?; + for (hash, number, is_best, is_finalized) in meta_updates { + self.blockchain.update_meta(hash, number, is_best, is_finalized); + } } Ok(()) } + + // write stuff to a transaction after a new block is finalized. + // this canonicalizes finalized blocks. Fails if called with a block which + // was not a child of the last finalized block. + fn note_finalized( + &self, + transaction: &mut DBTransaction, + f_header: &Block::Header, + f_hash: Block::Hash, + ) -> Result<(), client::error::Error> where + Block: BlockT<Hash=H256>, + { + let f_num = f_header.number().clone(); + + if f_num.as_() > self.storage.state_db.best_canonical() { + let parent_hash = f_header.parent_hash().clone(); + + let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone()); + transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key); + + let commit = self.storage.state_db.canonicalize_block(&f_hash) + .map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?; + apply_state_commit(transaction, commit); + + // read config from genesis, since it is readonly atm + use client::backend::Backend; + let changes_trie_config: Option<ChangesTrieConfiguration> = self.state_at(BlockId::Hash(parent_hash))? + .storage(well_known_keys::CHANGES_TRIE_CONFIG)? + .and_then(|v| Decode::decode(&mut &*v)); + self.changes_tries_storage.prune(changes_trie_config, transaction, f_hash, f_num); + } + + Ok(()) + } +} + +fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet<H256>) { + for (key, val) in commit.data.inserted.into_iter() { + transaction.put(columns::STATE, &key[..], &val); + } + for key in commit.data.deleted.into_iter() { + transaction.delete(columns::STATE, &key[..]); + } + for (key, val) in commit.meta.inserted.into_iter() { + transaction.put(columns::STATE_META, &key[..], &val); + } + for key in commit.meta.deleted.into_iter() { + transaction.delete(columns::STATE_META, &key[..]); + } +} + +impl<Block> client::backend::AuxStore for Backend<Block> where Block: BlockT<Hash=H256> { + fn insert_aux< + 'a, + 'b: 'a, + 'c: 'a, + I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>, + D: IntoIterator<Item=&'a &'b [u8]>, + >(&self, insert: I, delete: D) -> client::error::Result<()> { + let mut transaction = DBTransaction::new(); + for (k, v) in insert { + transaction.put(columns::AUX, k, v); + } + for k in delete { + transaction.delete(columns::AUX, k); + } + self.storage.db.write(transaction).map_err(db_err)?; + Ok(()) + } + + fn get_aux(&self, key: &[u8]) -> Result<Option<Vec<u8>>, client::error::Error> { + Ok(self.storage.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)?) + } +} + +impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> where Block: BlockT<Hash=H256> { + type BlockImportOperation = BlockImportOperation<Block, Blake2Hasher>; + type Blockchain = BlockchainDb<Block>; + type State = CachingState<Blake2Hasher, DbState, Block>; + type ChangesTrieStorage = DbChangesTrieStorage<Block>; + + fn begin_operation(&self) -> Result<Self::BlockImportOperation, client::error::Error> { + let old_state = self.state_at(BlockId::Hash(Default::default()))?; + Ok(BlockImportOperation { + pending_block: None, + old_state, + db_updates: MemoryDB::default(), + storage_updates: Default::default(), + changes_trie_updates: MemoryDB::default(), + aux_ops: Vec::new(), + finalized_blocks: Vec::new(), + }) + } + + fn begin_state_operation(&self, operation: &mut Self::BlockImportOperation, block: BlockId<Block>) -> Result<(), client::error::Error> { + operation.old_state = self.state_at(block)?; + Ok(()) + } + + fn commit_operation(&self, operation: Self::BlockImportOperation) + -> Result<(), client::error::Error> + { + match self.try_commit_operation(operation) { + Ok(_) => { + self.storage.state_db.apply_pending(); + Ok(()) + }, + e @ Err(_) => { + self.storage.state_db.revert_pending(); + e + } + } + } + fn finalize_block(&self, block: BlockId<Block>, justification: Option<Justification>) -> Result<(), client::error::Error> { let mut transaction = DBTransaction::new(); let hash = self.blockchain.expect_block_hash_from_id(&block)?; let header = self.blockchain.expect_header(block)?; - let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction( - &mut transaction, - &hash, - &header, - None, - justification, - )?; - self.storage.db.write(transaction).map_err(db_err)?; - self.blockchain.update_meta(hash, number, is_best, is_finalized); + let commit = || { + let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction( + &mut transaction, + &hash, + &header, + None, + justification, + )?; + self.storage.db.write(transaction).map_err(db_err)?; + self.blockchain.update_meta(hash, number, is_best, is_finalized); + Ok(()) + }; + match commit() { + Ok(()) => self.storage.state_db.apply_pending(), + e @ Err(_) => { + self.storage.state_db.revert_pending(); + return e; + } + } Ok(()) } diff --git a/substrate/core/state-db/src/lib.rs b/substrate/core/state-db/src/lib.rs index d612404cd9c1539880ba9206025a26b05f1c4960..9dd693c14eb39a111d39d4db05382aa2feae3dfd 100644 --- a/substrate/core/state-db/src/lib.rs +++ b/substrate/core/state-db/src/lib.rs @@ -72,8 +72,12 @@ pub enum Error<E: fmt::Debug> { Db(E), /// `Codec` decoding error. Decoding, - /// NonCanonical error. - NonCanonical, + /// Trying to canonicalize invalid block. + InvalidBlock, + /// Trying to insert block with invalid number. + InvalidBlockNumber, + /// Trying to insert block with unknown parent. + InvalidParent, } impl<E: fmt::Debug> fmt::Debug for Error<E> { @@ -81,7 +85,9 @@ impl<E: fmt::Debug> fmt::Debug for Error<E> { match self { Error::Db(e) => e.fmt(f), Error::Decoding => write!(f, "Error decoding slicable value"), - Error::NonCanonical => write!(f, "Error processing non-canonical data"), + Error::InvalidBlock => write!(f, "Trying to canonicalize invalid block"), + Error::InvalidBlockNumber => write!(f, "Trying to insert block with invalid number"), + Error::InvalidParent => write!(f, "Trying to insert block with unknown parent"), } } } @@ -205,27 +211,25 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> { } } - pub fn canonicalize_block(&mut self, hash: &BlockHash) -> CommitSet<Key> { - // clear the temporary overlay from the previous canonicalization. - self.non_canonical.clear_overlay(); + pub fn canonicalize_block<E: fmt::Debug>(&mut self, hash: &BlockHash) -> Result<CommitSet<Key>, Error<E>> { let mut commit = match self.mode { PruningMode::ArchiveAll => { CommitSet::default() }, PruningMode::ArchiveCanonical => { - let mut commit = self.non_canonical.canonicalize(hash); + let mut commit = self.non_canonical.canonicalize(hash)?; commit.data.deleted.clear(); commit }, PruningMode::Constrained(_) => { - self.non_canonical.canonicalize(hash) + self.non_canonical.canonicalize(hash)? }, }; if let Some(ref mut pruning) = self.pruning { pruning.note_canonical(hash, &mut commit); } self.prune(&mut commit); - commit + Ok(commit) } pub fn best_canonical(&self) -> u64 { @@ -284,6 +288,20 @@ impl<BlockHash: Hash, Key: Hash> StateDbSync<BlockHash, Key> { } db.get(key).map_err(|e| Error::Db(e)) } + + pub fn apply_pending(&mut self) { + self.non_canonical.apply_pending(); + if let Some(pruning) = &mut self.pruning { + pruning.apply_pending(); + } + } + + pub fn revert_pending(&mut self) { + if let Some(pruning) = &mut self.pruning { + pruning.revert_pending(); + } + self.non_canonical.revert_pending(); + } } /// State DB maintenance. See module description. @@ -306,7 +324,7 @@ impl<BlockHash: Hash, Key: Hash> StateDb<BlockHash, Key> { } /// Finalize a previously inserted block. - pub fn canonicalize_block(&self, hash: &BlockHash) -> CommitSet<Key> { + pub fn canonicalize_block<E: fmt::Debug>(&self, hash: &BlockHash) -> Result<CommitSet<Key>, Error<E>> { self.db.write().canonicalize_block(hash) } @@ -341,6 +359,16 @@ impl<BlockHash: Hash, Key: Hash> StateDb<BlockHash, Key> { pub fn is_pruned(&self, number: u64) -> bool { return self.db.read().is_pruned(number) } + + /// Apply all pending changes + pub fn apply_pending(&self) { + self.db.write().apply_pending(); + } + + /// Revert all pending changes + pub fn revert_pending(&self) { + self.db.write().revert_pending(); + } } #[cfg(test)] @@ -394,7 +422,9 @@ mod tests { ) .unwrap(), ); - db.commit(&state_db.canonicalize_block(&H256::from_low_u64_be(1))); + state_db.apply_pending(); + db.commit(&state_db.canonicalize_block::<io::Error>(&H256::from_low_u64_be(1)).unwrap()); + state_db.apply_pending(); db.commit( &state_db .insert_block::<io::Error>( @@ -405,8 +435,11 @@ mod tests { ) .unwrap(), ); - db.commit(&state_db.canonicalize_block(&H256::from_low_u64_be(21))); - db.commit(&state_db.canonicalize_block(&H256::from_low_u64_be(3))); + state_db.apply_pending(); + db.commit(&state_db.canonicalize_block::<io::Error>(&H256::from_low_u64_be(21)).unwrap()); + state_db.apply_pending(); + db.commit(&state_db.canonicalize_block::<io::Error>(&H256::from_low_u64_be(3)).unwrap()); + state_db.apply_pending(); (db, state_db) } diff --git a/substrate/core/state-db/src/noncanonical.rs b/substrate/core/state-db/src/noncanonical.rs index b69ebcb2c32b519c7d50e6d079e9e3e32b7eb433..61e720f7773c3b64aca4a6608a301b7c5fe4e23e 100644 --- a/substrate/core/state-db/src/noncanonical.rs +++ b/substrate/core/state-db/src/noncanonical.rs @@ -17,8 +17,8 @@ //! Canonicalization window. //! Maintains trees of block overlays and allows discarding trees/roots //! The overlays are added in `insert` and removed in `canonicalize`. -//! Last canonicalized overlay is kept in memory until next call to `canonicalize` or -//! `clear_overlay` +//! All pending changes are kept in memory until next call to `apply_pending` or +//! `revert_pending` use std::fmt; use std::collections::{HashMap, VecDeque}; @@ -35,7 +35,8 @@ pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> { last_canonicalized: Option<(BlockHash, u64)>, levels: VecDeque<Vec<BlockOverlay<BlockHash, Key>>>, parents: HashMap<BlockHash, BlockHash>, - last_canonicalized_overlay: HashMap<Key, DBValue>, + pending_canonicalizations: Vec<BlockHash>, + pending_insertions: Vec<BlockHash>, } #[derive(Encode, Decode)] @@ -109,39 +110,45 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> { last_canonicalized, levels, parents, - last_canonicalized_overlay: Default::default(), + pending_canonicalizations: Default::default(), + pending_insertions: Default::default(), }) } /// Insert a new block into the overlay. If inserted on the second level or lover expects parent to be present in the window. pub fn insert<E: fmt::Debug>(&mut self, hash: &BlockHash, number: u64, parent_hash: &BlockHash, changeset: ChangeSet<Key>) -> Result<CommitSet<Key>, Error<E>> { let mut commit = CommitSet::default(); + let front_block_number = self.pending_front_block_number(); if self.levels.is_empty() && self.last_canonicalized.is_none() { if number < 1 { - return Err(Error::NonCanonical); + return Err(Error::InvalidBlockNumber); } // assume that parent was canonicalized let last_canonicalized = (parent_hash.clone(), number - 1); commit.meta.inserted.push((to_meta_key(LAST_CANONICAL, &()), last_canonicalized.encode())); self.last_canonicalized = Some(last_canonicalized); } else if self.last_canonicalized.is_some() { - if number < self.front_block_number() || number >= self.front_block_number() + self.levels.len() as u64 + 1 { - return Err(Error::NonCanonical); + if number < front_block_number || number >= front_block_number + self.levels.len() as u64 + 1 { + trace!(target: "state-db", "Failed to insert block {}, current is {} .. {})", + number, + front_block_number, + front_block_number + self.levels.len() as u64, + ); + return Err(Error::InvalidBlockNumber); } // check for valid parent if inserting on second level or higher - if number == self.front_block_number() { + if number == front_block_number { if !self.last_canonicalized.as_ref().map_or(false, |&(ref h, n)| h == parent_hash && n == number - 1) { - return Err(Error::NonCanonical); + return Err(Error::InvalidParent); } } else if !self.parents.contains_key(&parent_hash) { - return Err(Error::NonCanonical); + return Err(Error::InvalidParent); } } - let level = if self.levels.is_empty() || number == self.front_block_number() + self.levels.len() as u64 { + let level = if self.levels.is_empty() || number == front_block_number + self.levels.len() as u64 { self.levels.push_back(Vec::new()); self.levels.back_mut().expect("can't be empty after insertion; qed") } else { - let front_block_number = self.front_block_number(); self.levels.get_mut((number - front_block_number) as usize) .expect("number is [front_block_number .. front_block_number + levels.len()) is asserted in precondition; qed") }; @@ -166,85 +173,118 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> { trace!(target: "state-db", "Inserted uncanonicalized changeset {}.{} ({} inserted, {} deleted)", number, index, journal_record.inserted.len(), journal_record.deleted.len()); let journal_record = journal_record.encode(); commit.meta.inserted.push((journal_key, journal_record)); + self.pending_insertions.push(hash.clone()); Ok(commit) } - fn discard( - levels: &mut [Vec<BlockOverlay<BlockHash, Key>>], + fn discard_descendants( + levels: &mut VecDeque<Vec<BlockOverlay<BlockHash, Key>>>, + index: usize, parents: &mut HashMap<BlockHash, BlockHash>, - discarded_journals: &mut Vec<Vec<u8>>, - number: u64, hash: &BlockHash, ) { - if let Some((level, sublevels)) = levels.split_first_mut() { + let mut discarded = Vec::new(); + if let Some(level) = levels.get_mut(index) { level.retain(|ref overlay| { let parent = parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone(); if parent == *hash { parents.remove(&overlay.hash); - discarded_journals.push(overlay.journal_key.clone()); - Self::discard(sublevels, parents, discarded_journals, number + 1, &overlay.hash); + discarded.push(overlay.hash.clone()); false } else { true } }); } + for hash in discarded.into_iter() { + Self::discard_descendants(levels, index + 1, parents, &hash); + } + } + + fn discard_journals(&self, level_index: usize, discarded_journals: &mut Vec<Vec<u8>>, hash: &BlockHash) { + if let Some(level) = self.levels.get(level_index) { + level.iter().for_each(|overlay| { + let parent = self.parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone(); + if parent == *hash { + discarded_journals.push(overlay.journal_key.clone()); + self.discard_journals(level_index + 1, discarded_journals, &overlay.hash); + } + }); + } } fn front_block_number(&self) -> u64 { self.last_canonicalized.as_ref().map(|&(_, n)| n + 1).unwrap_or(0) } - pub fn last_canonicalized_block_number(&self) -> u64 { - self.last_canonicalized.as_ref().map(|&(_, n)| n).unwrap_or(0) + fn pending_front_block_number(&self) -> u64 { + self.last_canonicalized + .as_ref() + .map(|&(_, n)| n + 1 + self.pending_canonicalizations.len() as u64) + .unwrap_or(0) } - /// This may be called when the last finalization commit was applied to the database. - pub fn clear_overlay(&mut self) { - self.last_canonicalized_overlay.clear(); + pub fn last_canonicalized_block_number(&self) -> u64 { + self.last_canonicalized.as_ref().map(|&(_, n)| n).unwrap_or(0) } /// Select a top-level root and canonicalized it. Discards all sibling subtrees and the root. /// Returns a set of changes that need to be added to the DB. - pub fn canonicalize(&mut self, hash: &BlockHash) -> CommitSet<Key> { + pub fn canonicalize<E: fmt::Debug>(&mut self, hash: &BlockHash) -> Result<CommitSet<Key>, Error<E>> { trace!(target: "state-db", "Canonicalizing {:?}", hash); - let level = self.levels.pop_front().expect("no blocks to canonicalize"); - let index = level.iter().position(|overlay| overlay.hash == *hash) - .expect("attempting to canonicalize unknown block"); + let level = self.levels.get(self.pending_canonicalizations.len()).ok_or_else(|| Error::InvalidBlock)?; + let index = level + .iter() + .position(|overlay| overlay.hash == *hash) + .ok_or_else(|| Error::InvalidBlock)?; let mut commit = CommitSet::default(); let mut discarded_journals = Vec::new(); for (i, overlay) in level.into_iter().enumerate() { - self.parents.remove(&overlay.hash); if i == index { - self.last_canonicalized_overlay = overlay.values; // that's the one we need to canonicalize - commit.data.inserted = self.last_canonicalized_overlay.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); - commit.data.deleted = overlay.deleted; + commit.data.inserted = overlay.values.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); + commit.data.deleted = overlay.deleted.clone(); } else { - // borrow checker won't allow us to split out mutable references - // required for recursive processing. A more efficient implementation - // that does not require converting to vector is possible - let mut vec: Vec<_> = self.levels.drain(..).collect(); - Self::discard(&mut vec, &mut self.parents, &mut discarded_journals, 0, &overlay.hash); - self.levels.extend(vec.into_iter()); + self.discard_journals(self.pending_canonicalizations.len() + 1, &mut discarded_journals, &overlay.hash); } - // cleanup journal entry - discarded_journals.push(overlay.journal_key); + discarded_journals.push(overlay.journal_key.clone()); } commit.meta.deleted.append(&mut discarded_journals); - let last_canonicalized = (hash.clone(), self.front_block_number()); - commit.meta.inserted.push((to_meta_key(LAST_CANONICAL, &()), last_canonicalized.encode())); - self.last_canonicalized = Some(last_canonicalized); - trace!(target: "state-db", "Discarded {} records", commit.meta.deleted.len()); - commit + let canonicalized = (hash.clone(), self.front_block_number() + self.pending_canonicalizations.len() as u64); + commit.meta.inserted.push((to_meta_key(LAST_CANONICAL, &()), canonicalized.encode())); + trace!(target: "state-db", "Discarding {} records", commit.meta.deleted.len()); + self.pending_canonicalizations.push(hash.clone()); + Ok(commit) + } + + fn apply_canonicalizations(&mut self) { + let last = self.pending_canonicalizations.last().cloned(); + let count = self.pending_canonicalizations.len() as u64; + for hash in self.pending_canonicalizations.drain(..) { + trace!(target: "state-db", "Post canonicalizing {:?}", hash); + let level = self.levels.pop_front().expect("Hash validity is checked in `canonicalize`"); + let index = level + .iter() + .position(|overlay| overlay.hash == hash) + .expect("Hash validity is checked in `canonicalize`"); + + // discard unfinalized overlays + for (i, overlay) in level.into_iter().enumerate() { + self.parents.remove(&overlay.hash); + if i != index { + Self::discard_descendants(&mut self.levels, 0, &mut self.parents, &overlay.hash); + } + } + } + if let Some(hash) = last { + let last_canonicalized = (hash, self.last_canonicalized_block_number() + count); + self.last_canonicalized = Some(last_canonicalized); + } } /// Get a value from the node overlay. This searches in every existing changeset. pub fn get(&self, key: &Key) -> Option<DBValue> { - if let Some(value) = self.last_canonicalized_overlay.get(&key) { - return Some(value.clone()); - } for level in self.levels.iter() { for overlay in level.iter() { if let Some(value) = overlay.values.get(&key) { @@ -266,14 +306,44 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> { commit }) } + + fn revert_insertions(&mut self) { + self.pending_insertions.reverse(); + for hash in self.pending_insertions.drain(..) { + self.parents.remove(&hash); + // find a level. When iterating insertions backwards the hash is always last in the level. + let level_index = + self.levels.iter().position(|level| + level.last().expect("Hash is added in `insert` in reverse order").hash == hash) + .expect("Hash is added in insert"); + + self.levels[level_index].pop(); + if self.levels[level_index].is_empty() { + debug_assert_eq!(level_index, self.levels.len() - 1); + self.levels.pop_back(); + } + } + } + + /// Apply all pending changes + pub fn apply_pending(&mut self) { + self.apply_canonicalizations(); + self.pending_insertions.clear(); + } + + /// Revert all pending changes + pub fn revert_pending(&mut self) { + self.pending_canonicalizations.clear(); + self.revert_insertions(); + } } #[cfg(test)] mod tests { use std::io; - use super::NonCanonicalOverlay; - use crate::ChangeSet; use primitives::H256; + use super::{NonCanonicalOverlay, to_journal_key}; + use crate::ChangeSet; use crate::test::{make_db, make_changeset}; fn contains(overlay: &NonCanonicalOverlay<H256, H256>, key: u64) -> bool { @@ -294,7 +364,7 @@ mod tests { fn canonicalize_empty_panics() { let db = make_db(&[]); let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap(); - overlay.canonicalize(&H256::default()); + overlay.canonicalize::<io::Error>(&H256::default()).unwrap(); } #[test] @@ -338,7 +408,7 @@ mod tests { let db = make_db(&[]); let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap(); overlay.insert::<io::Error>(&h1, 1, &H256::default(), ChangeSet::default()).unwrap(); - overlay.canonicalize(&h2); + overlay.canonicalize::<io::Error>(&h2).unwrap(); } #[test] @@ -353,7 +423,7 @@ mod tests { assert_eq!(insertion.meta.inserted.len(), 2); assert_eq!(insertion.meta.deleted.len(), 0); db.commit(&insertion); - let finalization = overlay.canonicalize(&h1); + let finalization = overlay.canonicalize::<io::Error>(&h1).unwrap(); assert_eq!(finalization.data.inserted.len(), changeset.inserted.len()); assert_eq!(finalization.data.deleted.len(), changeset.deleted.len()); assert_eq!(finalization.meta.inserted.len(), 1); @@ -386,7 +456,8 @@ mod tests { let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap(); db.commit(&overlay.insert::<io::Error>(&h1, 10, &H256::default(), make_changeset(&[3, 4], &[2])).unwrap()); db.commit(&overlay.insert::<io::Error>(&h2, 11, &h1, make_changeset(&[5], &[3])).unwrap()); - db.commit(&overlay.canonicalize(&h1)); + db.commit(&overlay.canonicalize::<io::Error>(&h1).unwrap()); + overlay.apply_pending(); assert_eq!(overlay.levels.len(), 1); let overlay2 = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap(); @@ -410,15 +481,17 @@ mod tests { assert!(contains(&overlay, 5)); assert_eq!(overlay.levels.len(), 2); assert_eq!(overlay.parents.len(), 2); - db.commit(&overlay.canonicalize(&h1)); + db.commit(&overlay.canonicalize::<io::Error>(&h1).unwrap()); + assert!(contains(&overlay, 5)); + assert_eq!(overlay.levels.len(), 2); + assert_eq!(overlay.parents.len(), 2); + overlay.apply_pending(); assert_eq!(overlay.levels.len(), 1); assert_eq!(overlay.parents.len(), 1); - assert!(contains(&overlay, 5)); - overlay.clear_overlay(); assert!(!contains(&overlay, 5)); assert!(contains(&overlay, 7)); - db.commit(&overlay.canonicalize(&h2)); - overlay.clear_overlay(); + db.commit(&overlay.canonicalize::<io::Error>(&h2).unwrap()); + overlay.apply_pending(); assert_eq!(overlay.levels.len(), 0); assert_eq!(overlay.parents.len(), 0); assert!(db.data_eq(&make_db(&[1, 4, 6, 7, 8]))); @@ -427,6 +500,7 @@ mod tests { #[test] fn complex_tree() { + use crate::MetaDb; let mut db = make_db(&[]); // - 1 - 1_1 - 1_1_1 @@ -487,8 +561,8 @@ mod tests { assert_eq!(overlay.last_canonicalized, overlay2.last_canonicalized); // canonicalize 1. 2 and all its children should be discarded - db.commit(&overlay.canonicalize(&h_1)); - overlay.clear_overlay(); + db.commit(&overlay.canonicalize::<io::Error>(&h_1).unwrap()); + overlay.apply_pending(); assert_eq!(overlay.levels.len(), 2); assert_eq!(overlay.parents.len(), 6); assert!(!contains(&overlay, 1)); @@ -497,10 +571,17 @@ mod tests { assert!(!contains(&overlay, 22)); assert!(!contains(&overlay, 211)); assert!(contains(&overlay, 111)); + assert!(!contains(&overlay, 211)); + // check that journals are deleted + assert!(db.get_meta(&to_journal_key(1, 0)).unwrap().is_none()); + assert!(db.get_meta(&to_journal_key(1, 1)).unwrap().is_none()); + assert!(db.get_meta(&to_journal_key(2, 1)).unwrap().is_some()); + assert!(db.get_meta(&to_journal_key(2, 2)).unwrap().is_none()); + assert!(db.get_meta(&to_journal_key(2, 3)).unwrap().is_none()); // canonicalize 1_2. 1_1 and all its children should be discarded - db.commit(&overlay.canonicalize(&h_1_2)); - overlay.clear_overlay(); + db.commit(&overlay.canonicalize::<io::Error>(&h_1_2).unwrap()); + overlay.apply_pending(); assert_eq!(overlay.levels.len(), 1); assert_eq!(overlay.parents.len(), 3); assert!(!contains(&overlay, 11)); @@ -510,8 +591,8 @@ mod tests { assert!(contains(&overlay, 123)); // canonicalize 1_2_2 - db.commit(&overlay.canonicalize(&h_1_2_2)); - overlay.clear_overlay(); + db.commit(&overlay.canonicalize::<io::Error>(&h_1_2_2).unwrap()); + overlay.apply_pending(); assert_eq!(overlay.levels.len(), 0); assert_eq!(overlay.parents.len(), 0); assert!(db.data_eq(&make_db(&[1, 12, 122]))); @@ -540,5 +621,29 @@ mod tests { assert!(overlay.revert_one().is_none()); } + #[test] + fn revert_pending_insertion() { + let h1 = H256::random(); + let h2_1 = H256::random(); + let h2_2 = H256::random(); + let db = make_db(&[]); + let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap(); + let changeset1 = make_changeset(&[5, 6], &[2]); + let changeset2 = make_changeset(&[7, 8], &[5, 3]); + let changeset3 = make_changeset(&[9], &[]); + overlay.insert::<io::Error>(&h1, 1, &H256::default(), changeset1).unwrap(); + assert!(contains(&overlay, 5)); + overlay.insert::<io::Error>(&h2_1, 2, &h1, changeset2).unwrap(); + overlay.insert::<io::Error>(&h2_2, 2, &h1, changeset3).unwrap(); + assert!(contains(&overlay, 7)); + assert!(contains(&overlay, 5)); + assert!(contains(&overlay, 9)); + assert_eq!(overlay.levels.len(), 2); + assert_eq!(overlay.parents.len(), 3); + overlay.revert_pending(); + assert!(!contains(&overlay, 5)); + assert_eq!(overlay.levels.len(), 0); + assert_eq!(overlay.parents.len(), 0); + } } diff --git a/substrate/core/state-db/src/pruning.rs b/substrate/core/state-db/src/pruning.rs index be979f9d21044816b312785bdc61634bf9971faa..f069e18ad668de915ab2beaef59ea8fa04f71ea2 100644 --- a/substrate/core/state-db/src/pruning.rs +++ b/substrate/core/state-db/src/pruning.rs @@ -23,10 +23,11 @@ //! The changes are journaled in the DB. use std::collections::{HashMap, HashSet, VecDeque}; +use std::mem; use crate::codec::{Encode, Decode}; -use parity_codec_derive::{Encode, Decode}; use crate::{CommitSet, Error, MetaDb, to_meta_key, Hash}; -use log::trace; +use parity_codec_derive::{Encode, Decode}; +use log::{trace, warn}; const LAST_PRUNED: &[u8] = b"last_pruned"; const PRUNING_JOURNAL: &[u8] = b"pruning_journal"; @@ -36,6 +37,8 @@ pub struct RefWindow<BlockHash: Hash, Key: Hash> { death_rows: VecDeque<DeathRow<BlockHash, Key>>, death_index: HashMap<Key, u64>, pending_number: u64, + pending_records: Vec<(u64, JournalRecord<BlockHash, Key>)>, + pending_prunings: usize, } #[derive(Debug, PartialEq, Eq)] @@ -69,6 +72,8 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> { death_rows: Default::default(), death_index: Default::default(), pending_number: pending_number, + pending_records: Default::default(), + pending_prunings: 0, }; // read the journal trace!(target: "state-db", "Reading pruning journal. Pending #{}", pending_number); @@ -110,11 +115,11 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> { } pub fn window_size(&self) -> u64 { - self.death_rows.len() as u64 + (self.death_rows.len() + self.pending_records.len() - self.pending_prunings) as u64 } pub fn next_hash(&self) -> Option<BlockHash> { - self.death_rows.front().map(|r| r.hash.clone()) + self.death_rows.get(self.pending_prunings).map(|r| r.hash.clone()) } pub fn mem_used(&self) -> usize { @@ -122,20 +127,28 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> { } pub fn pending(&self) -> u64 { - self.pending_number + self.pending_number + self.pending_prunings as u64 } /// Prune next block. Expects at least one block in the window. Adds changes to `commit`. pub fn prune_one(&mut self, commit: &mut CommitSet<Key>) { - let pruned = self.death_rows.pop_front().expect("prune_one is only called with a non-empty window"); - trace!(target: "state-db", "Pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len()); - for k in pruned.deleted.iter() { - self.death_index.remove(&k); + if let Some(pruned) = self.death_rows.get(self.pending_prunings) { + trace!(target: "state-db", "Pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len()); + let index = self.pending_number + self.pending_prunings as u64; + commit.data.deleted.extend(pruned.deleted.iter().cloned()); + commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), index.encode())); + commit.meta.deleted.push(pruned.journal_key.clone()); + self.pending_prunings += 1; + } else if let Some((block, pruned)) = self.pending_records.get(self.pending_prunings - self.death_rows.len()) { + trace!(target: "state-db", "Pruning pending{:?} ({} deleted)", pruned.hash, pruned.deleted.len()); + commit.data.deleted.extend(pruned.deleted.iter().cloned()); + commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), block.encode())); + let journal_key = to_journal_key(*block); + commit.meta.deleted.push(journal_key); + self.pending_prunings += 1; + } else { + warn!(target: "state-db", "Trying to prune when there's nothing to prune"); } - commit.data.deleted.extend(pruned.deleted.into_iter()); - commit.meta.inserted.push((to_meta_key(LAST_PRUNED, &()), self.pending_number.encode())); - commit.meta.deleted.push(pruned.journal_key); - self.pending_number += 1; } /// Add a change set to the window. Creates a journal record and pushes it to `commit` @@ -148,11 +161,34 @@ impl<BlockHash: Hash, Key: Hash> RefWindow<BlockHash, Key> { inserted, deleted, }; - let block = self.pending_number + self.window_size(); + let block = self.pending_number + self.window_size() as u64; let journal_key = to_journal_key(block); commit.meta.inserted.push((journal_key.clone(), journal_record.encode())); + self.pending_records.push((block, journal_record)); + } + + /// Apply all pending changes + pub fn apply_pending(&mut self) { + for (block, journal_record) in mem::replace(&mut self.pending_records, Default::default()).into_iter() { + trace!(target: "state-db", "Applying pruning window record: {}: {:?}", block, journal_record.hash); + let journal_key = to_journal_key(block); + self.import(&journal_record.hash, journal_key, journal_record.inserted.into_iter(), journal_record.deleted); + } + for _ in 0 .. self.pending_prunings { + let pruned = self.death_rows.pop_front().expect("pending_prunings is always < death_rows.len()"); + trace!(target: "state-db", "Applying pruning {:?} ({} deleted)", pruned.hash, pruned.deleted.len()); + for k in pruned.deleted.iter() { + self.death_index.remove(&k); + } + self.pending_number += 1; + } + self.pending_prunings = 0; + } - self.import(hash, journal_key, journal_record.inserted.into_iter(), journal_record.deleted); + /// Revert all pending changes + pub fn revert_pending(&mut self) { + self.pending_records.clear(); + self.pending_prunings = 0; } } @@ -180,12 +216,16 @@ mod tests { } #[test] - #[should_panic] - fn prune_empty_panics() { + fn prune_empty() { let db = make_db(&[]); let mut pruning: RefWindow<H256, H256> = RefWindow::new(&db).unwrap(); let mut commit = CommitSet::default(); pruning.prune_one(&mut commit); + assert_eq!(pruning.pending_number, 0); + assert!(pruning.death_rows.is_empty()); + assert!(pruning.death_index.is_empty()); + assert!(pruning.pending_prunings == 0); + assert!(pruning.pending_records.is_empty()); } #[test] @@ -196,6 +236,7 @@ mod tests { let h = H256::random(); pruning.note_canonical(&h, &mut commit); db.commit(&commit); + pruning.apply_pending(); assert!(commit.data.deleted.is_empty()); assert_eq!(pruning.death_rows.len(), 1); assert_eq!(pruning.death_index.len(), 2); @@ -205,6 +246,7 @@ mod tests { let mut commit = CommitSet::default(); pruning.prune_one(&mut commit); db.commit(&commit); + pruning.apply_pending(); assert!(db.data_eq(&make_db(&[2, 4, 5]))); assert!(pruning.death_rows.is_empty()); assert!(pruning.death_index.is_empty()); @@ -221,6 +263,7 @@ mod tests { let mut commit = make_commit(&[5], &[2]); pruning.note_canonical(&H256::random(), &mut commit); db.commit(&commit); + pruning.apply_pending(); assert!(db.data_eq(&make_db(&[1, 2, 3, 4, 5]))); check_journal(&pruning, &db); @@ -228,10 +271,12 @@ mod tests { let mut commit = CommitSet::default(); pruning.prune_one(&mut commit); db.commit(&commit); + pruning.apply_pending(); assert!(db.data_eq(&make_db(&[2, 3, 4, 5]))); let mut commit = CommitSet::default(); pruning.prune_one(&mut commit); db.commit(&commit); + pruning.apply_pending(); assert!(db.data_eq(&make_db(&[3, 4, 5]))); assert_eq!(pruning.pending_number, 2); } @@ -250,6 +295,7 @@ mod tests { pruning.note_canonical(&H256::random(), &mut commit); db.commit(&commit); assert!(db.data_eq(&make_db(&[1, 2, 3]))); + pruning.apply_pending(); check_journal(&pruning, &db); @@ -264,6 +310,7 @@ mod tests { pruning.prune_one(&mut commit); db.commit(&commit); assert!(db.data_eq(&make_db(&[1, 3]))); + pruning.apply_pending(); assert_eq!(pruning.pending_number, 3); } } diff --git a/substrate/core/state-db/src/test.rs b/substrate/core/state-db/src/test.rs index 0f7b76496009faa20f9f627384078f588f245756..871750d0c8b75d3f64b8e7f0c2d6b4d3079e6b2d 100644 --- a/substrate/core/state-db/src/test.rs +++ b/substrate/core/state-db/src/test.rs @@ -46,6 +46,7 @@ impl HashDb for TestDb { impl TestDb { pub fn commit(&mut self, commit: &CommitSet<H256>) { self.data.extend(commit.data.inserted.iter().cloned()); + self.meta.extend(commit.meta.inserted.iter().cloned()); for k in commit.data.deleted.iter() { self.data.remove(k); } diff --git a/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm b/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm index 0adf416053913db9853cad7a9274a2620c93dc29..c0d856e0a97b0379ac7e83c63084c915eea9e5a7 100644 Binary files a/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm and b/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm differ diff --git a/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm b/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm index 7c62fb49da012b266d03ade05ed3f744e370455d..b6a4fbbc53b7e765f3f1f1b542ee1504a764624f 100644 Binary files a/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm and b/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm differ