From 4a0d6d9490b7f8711409ecf61bd1b27cdb94045d Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan <arkady.paronyan@gmail.com> Date: Thu, 18 Mar 2021 12:46:27 +0100 Subject: [PATCH] Storage chains: indexing, renewals and reference counting (#8265) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Transaction indexing * Tests and fixes * Fixed a comment * Style * Build * Style * Apply suggestions from code review Co-authored-by: cheme <emericchevalier.pro@gmail.com> * Code review suggestions * Add missing impl * Apply suggestions from code review Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * impl JoinInput * Don't store empty slices * JoinInput operates on slices Co-authored-by: cheme <emericchevalier.pro@gmail.com> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> --- substrate/client/api/src/backend.rs | 5 +- substrate/client/api/src/client.rs | 15 +- substrate/client/api/src/in_mem.rs | 10 +- substrate/client/api/src/leaves.rs | 7 +- substrate/client/db/Cargo.toml | 2 +- substrate/client/db/src/lib.rs | 371 ++++++++++++++---- substrate/client/db/src/light.rs | 2 +- substrate/client/db/src/parity_db.rs | 12 +- substrate/client/db/src/utils.rs | 48 ++- substrate/client/light/src/backend.rs | 7 +- substrate/client/light/src/blockchain.rs | 4 +- substrate/client/network/src/bitswap.rs | 11 +- substrate/client/service/src/client/client.rs | 10 +- .../primitives/blockchain/src/backend.rs | 13 +- substrate/primitives/database/Cargo.toml | 1 + substrate/primitives/database/src/kvdb.rs | 65 ++- substrate/primitives/database/src/lib.rs | 125 ++---- substrate/primitives/database/src/mem.rs | 46 ++- substrate/primitives/externalities/src/lib.rs | 10 + substrate/primitives/state-machine/src/ext.rs | 32 +- substrate/primitives/state-machine/src/lib.rs | 1 + .../src/overlayed_changes/mod.rs | 41 ++ 22 files changed, 596 insertions(+), 242 deletions(-) diff --git a/substrate/client/api/src/backend.rs b/substrate/client/api/src/backend.rs index 3108ba89946..14841d8d3e9 100644 --- a/substrate/client/api/src/backend.rs +++ b/substrate/client/api/src/backend.rs @@ -26,7 +26,7 @@ use sp_runtime::{generic::BlockId, Justification, Justifications, Storage}; use sp_runtime::traits::{Block as BlockT, NumberFor, HashFor}; use sp_state_machine::{ ChangesTrieState, ChangesTrieStorage as StateChangesTrieStorage, ChangesTrieTransaction, - StorageCollection, ChildStorageCollection, OffchainChangesCollection, + StorageCollection, ChildStorageCollection, OffchainChangesCollection, IndexOperation, }; use sp_storage::{StorageData, StorageKey, PrefixedStorageKey, ChildInfo}; use crate::{ @@ -201,6 +201,9 @@ pub trait BlockImportOperation<Block: BlockT> { /// Mark a block as new head. If both block import and set head are specified, set head /// overrides block import's best block rule. fn mark_head(&mut self, id: BlockId<Block>) -> sp_blockchain::Result<()>; + + /// Add a transaction index operation. + fn update_transaction_index(&mut self, index: Vec<IndexOperation>) -> sp_blockchain::Result<()>; } /// Interface for performing operations on the backend. diff --git a/substrate/client/api/src/client.rs b/substrate/client/api/src/client.rs index 97fe77c8d81..4a0940b1f4b 100644 --- a/substrate/client/api/src/client.rs +++ b/substrate/client/api/src/client.rs @@ -96,15 +96,18 @@ pub trait BlockBackend<Block: BlockT> { /// Get block hash by number. fn block_hash(&self, number: NumberFor<Block>) -> sp_blockchain::Result<Option<Block::Hash>>; - /// Get single extrinsic by hash. - fn extrinsic( + /// Get single indexed transaction by content hash. + /// + /// Note that this will only fetch transactions + /// that are indexed by the runtime with `storage_index_transaction`. + fn indexed_transaction( &self, hash: &Block::Hash, - ) -> sp_blockchain::Result<Option<<Block as BlockT>::Extrinsic>>; + ) -> sp_blockchain::Result<Option<Vec<u8>>>; - /// Check if extrinsic exists. - fn have_extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result<bool> { - Ok(self.extrinsic(hash)?.is_some()) + /// Check if transaction index exists. + fn has_indexed_transaction(&self, hash: &Block::Hash) -> sp_blockchain::Result<bool> { + Ok(self.indexed_transaction(hash)?.is_some()) } } diff --git a/substrate/client/api/src/in_mem.rs b/substrate/client/api/src/in_mem.rs index c3d26695427..930ae39c4b5 100644 --- a/substrate/client/api/src/in_mem.rs +++ b/substrate/client/api/src/in_mem.rs @@ -30,7 +30,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero, NumberFor, Ha use sp_runtime::{Justification, Justifications, Storage}; use sp_state_machine::{ ChangesTrieTransaction, InMemoryBackend, Backend as StateBackend, StorageCollection, - ChildStorageCollection, + ChildStorageCollection, IndexOperation, }; use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata}; @@ -415,10 +415,10 @@ impl<Block: BlockT> blockchain::Backend<Block> for Blockchain<Block> { unimplemented!() } - fn extrinsic( + fn indexed_transaction( &self, _hash: &Block::Hash, - ) -> sp_blockchain::Result<Option<<Block as BlockT>::Extrinsic>> { + ) -> sp_blockchain::Result<Option<Vec<u8>>> { unimplemented!("Not supported by the in-mem backend.") } } @@ -613,6 +613,10 @@ impl<Block: BlockT> backend::BlockImportOperation<Block> for BlockImportOperatio self.set_head = Some(block); Ok(()) } + + fn update_transaction_index(&mut self, _index: Vec<IndexOperation>) -> sp_blockchain::Result<()> { + Ok(()) + } } /// In-memory backend. Keeps all states and blocks in memory. diff --git a/substrate/client/api/src/leaves.rs b/substrate/client/api/src/leaves.rs index 1971012c6aa..47cac8b186f 100644 --- a/substrate/client/api/src/leaves.rs +++ b/substrate/client/api/src/leaves.rs @@ -25,7 +25,7 @@ use sp_runtime::traits::AtLeast32Bit; use codec::{Encode, Decode}; use sp_blockchain::{Error, Result}; -type DbHash = [u8; 32]; +type DbHash = sp_core::H256; #[derive(Debug, Clone, PartialEq, Eq)] struct LeafSetItem<H, N> { @@ -55,6 +55,11 @@ impl<H, N: Ord> FinalizationDisplaced<H, N> { // one transaction, then there will be no overlap in the keys. self.leaves.append(&mut other.leaves); } + + /// Iterate over all displaced leaves. + pub fn leaves(&self) -> impl IntoIterator<Item=&H> { + self.leaves.values().flatten() + } } /// list of leaf hashes ordered by number (descending). diff --git a/substrate/client/db/Cargo.toml b/substrate/client/db/Cargo.toml index 72c26fead1c..e5e52494c2d 100644 --- a/substrate/client/db/Cargo.toml +++ b/substrate/client/db/Cargo.toml @@ -35,7 +35,7 @@ sp-trie = { version = "3.0.0", path = "../../primitives/trie" } sp-consensus = { version = "0.9.0", path = "../../primitives/consensus/common" } sp-blockchain = { version = "3.0.0", path = "../../primitives/blockchain" } sp-database = { version = "3.0.0", path = "../../primitives/database" } -parity-db = { version = "0.2.2", optional = true } +parity-db = { version = "0.2.3", optional = true } prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.9.0", path = "../../utils/prometheus" } [dev-dependencies] diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index acda057938e..0fc8e299f2a 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -78,7 +78,7 @@ use sp_runtime::traits::{ use sp_state_machine::{ DBValue, ChangesTrieTransaction, ChangesTrieCacheAction, UsageInfo as StateUsageInfo, StorageCollection, ChildStorageCollection, OffchainChangesCollection, - backend::Backend as StateBackend, StateMachineStats, + backend::Backend as StateBackend, StateMachineStats, IndexOperation, }; use crate::utils::{DatabaseType, Meta, meta_keys, read_db, read_meta}; use crate::changes_tries_storage::{DbChangesTrieStorage, DbChangesTrieStorageTransaction}; @@ -107,7 +107,16 @@ pub type DbState<B> = sp_state_machine::TrieBackend< const DB_HASH_LEN: usize = 32; /// Hash type that this backend uses for the database. -pub type DbHash = [u8; DB_HASH_LEN]; +pub type DbHash = sp_core::H256; + +/// This is used as block body when storage-chain mode is enabled. +#[derive(Debug, Encode, Decode)] +struct ExtrinsicHeader { + /// Hash of the indexed part + indexed_hash: DbHash, // Zero hash if there's no indexed data + /// The rest of the data. + data: Vec<u8>, +} /// A reference tracking state. /// @@ -506,33 +515,47 @@ impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for Blockcha impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<Block> { fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> { - match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? { - Some(body) => { - match self.transaction_storage { - TransactionStorageMode::BlockBody => match Decode::decode(&mut &body[..]) { - Ok(body) => Ok(Some(body)), - Err(err) => return Err(sp_blockchain::Error::Backend( - format!("Error decoding body: {}", err) - )), - }, - TransactionStorageMode::StorageChain => { - match Vec::<Block::Hash>::decode(&mut &body[..]) { - Ok(hashes) => { - let extrinsics: ClientResult<Vec<Block::Extrinsic>> = hashes.into_iter().map( - |h| self.extrinsic(&h).and_then(|maybe_ex| maybe_ex.ok_or_else( - || sp_blockchain::Error::Backend( - format!("Missing transaction: {}", h)))) - ).collect(); - Ok(Some(extrinsics?)) + let body = match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? { + Some(body) => body, + None => return Ok(None), + }; + match self.transaction_storage { + TransactionStorageMode::BlockBody => match Decode::decode(&mut &body[..]) { + Ok(body) => Ok(Some(body)), + Err(err) => return Err(sp_blockchain::Error::Backend( + format!("Error decoding body: {}", err) + )), + }, + TransactionStorageMode::StorageChain => { + match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) { + Ok(index) => { + let extrinsics: ClientResult<Vec<Block::Extrinsic>> = index.into_iter().map( + | ExtrinsicHeader { indexed_hash, data } | { + let decode_result = if indexed_hash != Default::default() { + match self.db.get(columns::TRANSACTION, indexed_hash.as_ref()) { + Some(t) => { + let mut input = utils::join_input(data.as_ref(), t.as_ref()); + Block::Extrinsic::decode(&mut input) + }, + None => return Err(sp_blockchain::Error::Backend( + format!("Missing indexed transaction {:?}", indexed_hash)) + ) + } + } else { + Block::Extrinsic::decode(&mut data.as_ref()) + }; + decode_result.map_err(|err| sp_blockchain::Error::Backend( + format!("Error decoding extrinsic: {}", err)) + ) } - Err(err) => return Err(sp_blockchain::Error::Backend( - format!("Error decoding body list: {}", err) - )), - } + ).collect(); + Ok(Some(extrinsics?)) } + Err(err) => return Err(sp_blockchain::Error::Backend( + format!("Error decoding body list: {}", err) + )), } } - None => Ok(None), } } @@ -564,21 +587,11 @@ impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<B children::read_children(&*self.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash) } - fn extrinsic(&self, hash: &Block::Hash) -> ClientResult<Option<Block::Extrinsic>> { - match self.db.get(columns::TRANSACTION, hash.as_ref()) { - Some(ex) => { - match Decode::decode(&mut &ex[..]) { - Ok(ex) => Ok(Some(ex)), - Err(err) => Err(sp_blockchain::Error::Backend( - format!("Error decoding extrinsic {}: {}", hash, err) - )), - } - }, - None => Ok(None), - } + fn indexed_transaction(&self, hash: &Block::Hash) -> ClientResult<Option<Vec<u8>>> { + Ok(self.db.get(columns::TRANSACTION, hash.as_ref())) } - fn have_extrinsic(&self, hash: &Block::Hash) -> ClientResult<bool> { + fn has_indexed_transaction(&self, hash: &Block::Hash) -> ClientResult<bool> { Ok(self.db.contains(columns::TRANSACTION, hash.as_ref())) } } @@ -681,6 +694,7 @@ pub struct BlockImportOperation<Block: BlockT> { finalized_blocks: Vec<(BlockId<Block>, Option<Justification>)>, set_head: Option<BlockId<Block>>, commit_state: bool, + index_ops: Vec<IndexOperation>, } impl<Block: BlockT> BlockImportOperation<Block> { @@ -823,6 +837,11 @@ impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block> for Bloc self.set_head = Some(block); Ok(()) } + + fn update_transaction_index(&mut self, index_ops: Vec<IndexOperation>) -> ClientResult<()> { + self.index_ops = index_ops; + Ok(()) + } } struct StorageDb<Block: BlockT> { @@ -1155,21 +1174,21 @@ impl<Block: BlockT> Backend<Block> { if new_canonical <= self.storage.state_db.best_canonical().unwrap_or(0) { return Ok(()) } - let hash = if new_canonical == number_u64 { hash } else { - ::sc_client_api::blockchain::HeaderBackend::hash(&self.blockchain, new_canonical.saturated_into())? - .expect("existence of block with number `new_canonical` \ - implies existence of blocks with all numbers before it; qed") + sc_client_api::blockchain::HeaderBackend::hash( + &self.blockchain, + new_canonical.saturated_into(), + )?.expect("existence of block with number `new_canonical` \ + implies existence of blocks with all numbers before it; qed") }; trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash); let commit = self.storage.state_db.canonicalize_block(&hash) .map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from_state_db(e))?; apply_state_commit(transaction, commit); - }; - + } Ok(()) } @@ -1225,20 +1244,14 @@ impl<Block: BlockT> Backend<Block> { )?; transaction.set_from_vec(columns::HEADER, &lookup_key, pending_block.header.encode()); - if let Some(body) = &pending_block.body { + if let Some(body) = pending_block.body { match self.transaction_storage { TransactionStorageMode::BlockBody => { transaction.set_from_vec(columns::BODY, &lookup_key, body.encode()); }, TransactionStorageMode::StorageChain => { - let mut hashes = Vec::with_capacity(body.len()); - for extrinsic in body { - let extrinsic = extrinsic.encode(); - let hash = HashFor::<Block>::hash(&extrinsic); - transaction.set(columns::TRANSACTION, &hash.as_ref(), &extrinsic); - hashes.push(hash); - } - transaction.set_from_vec(columns::BODY, &lookup_key, hashes.encode()); + let body = apply_index_ops::<Block>(&mut transaction, body, operation.index_ops); + transaction.set_from_vec(columns::BODY, &lookup_key, body); }, } } @@ -1491,8 +1504,8 @@ impl<Block: BlockT> Backend<Block> { } } - self.prune_blocks(transaction, f_num)?; let new_displaced = self.blockchain.leaves.write().finalize_height(f_num); + self.prune_blocks(transaction, f_num, &new_displaced)?; match displaced { x @ &mut None => *x = Some(new_displaced), &mut Some(ref mut displaced) => displaced.merge(new_displaced), @@ -1505,47 +1518,83 @@ impl<Block: BlockT> Backend<Block> { &self, transaction: &mut Transaction<DbHash>, finalized: NumberFor<Block>, + displaced: &FinalizationDisplaced<Block::Hash, NumberFor<Block>>, ) -> ClientResult<()> { if let KeepBlocks::Some(keep_blocks) = self.keep_blocks { // Always keep the last finalized block let keep = std::cmp::max(keep_blocks, 1); - if finalized < keep.into() { - return Ok(()) + if finalized >= keep.into() { + let number = finalized.saturating_sub(keep.into()); + self.prune_block(transaction, BlockId::<Block>::number(number))?; } - let number = finalized.saturating_sub(keep.into()); - match read_db(&*self.storage.db, columns::KEY_LOOKUP, columns::BODY, BlockId::<Block>::number(number))? { - Some(body) => { - debug!(target: "db", "Removing block #{}", number); - utils::remove_from_db( - transaction, - &*self.storage.db, - columns::KEY_LOOKUP, - columns::BODY, - BlockId::<Block>::number(number), - )?; - match self.transaction_storage { - TransactionStorageMode::BlockBody => {}, - TransactionStorageMode::StorageChain => { - match Vec::<Block::Hash>::decode(&mut &body[..]) { - Ok(hashes) => { - for h in hashes { - transaction.remove(columns::TRANSACTION, h.as_ref()); + + // Also discard all blocks from displaced branches + for h in displaced.leaves() { + let mut number = finalized; + let mut hash = h.clone(); + // Follow displaced chains back until we reach a finalized block. + // Since leaves are discarded due to finality, they can't have parents + // that are canonical, but not yet finalized. So we stop deletig as soon as + // we reach canonical chain. + while self.blockchain.hash(number)? != Some(hash.clone()) { + let id = BlockId::<Block>::hash(hash.clone()); + match self.blockchain.header(id)? { + Some(header) => { + self.prune_block(transaction, id)?; + number = header.number().saturating_sub(One::one()); + hash = header.parent_hash().clone(); + }, + None => break, + } + } + } + } + Ok(()) + } + + fn prune_block( + &self, + transaction: &mut Transaction<DbHash>, + id: BlockId<Block>, + ) -> ClientResult<()> { + match read_db(&*self.storage.db, columns::KEY_LOOKUP, columns::BODY, id)? { + Some(body) => { + debug!(target: "db", "Removing block #{}", id); + utils::remove_from_db( + transaction, + &*self.storage.db, + columns::KEY_LOOKUP, + columns::BODY, + id, + )?; + match self.transaction_storage { + TransactionStorageMode::BlockBody => {}, + TransactionStorageMode::StorageChain => { + match Vec::<ExtrinsicHeader>::decode(&mut &body[..]) { + Ok(body) => { + for ExtrinsicHeader { indexed_hash, .. } in body { + if indexed_hash != Default::default() { + transaction.release( + columns::TRANSACTION, + indexed_hash, + ); } } - Err(err) => return Err(sp_blockchain::Error::Backend( - format!("Error decoding body list: {}", err) - )), } + Err(err) => return Err(sp_blockchain::Error::Backend( + format!("Error decoding body list: {}", err) + )), } } } - None => return Ok(()), } + None => return Ok(()), } Ok(()) } } + fn apply_state_commit(transaction: &mut Transaction<DbHash>, commit: sc_state_db::CommitSet<Vec<u8>>) { for (key, val) in commit.data.inserted.into_iter() { transaction.set_from_vec(columns::STATE, &key[..], val); @@ -1561,6 +1610,67 @@ fn apply_state_commit(transaction: &mut Transaction<DbHash>, commit: sc_state_db } } +fn apply_index_ops<Block: BlockT>( + transaction: &mut Transaction<DbHash>, + body: Vec<Block::Extrinsic>, + ops: Vec<IndexOperation>, +) -> Vec<u8> { + let mut extrinsic_headers: Vec<ExtrinsicHeader> = Vec::with_capacity(body.len()); + let mut index_map = HashMap::new(); + let mut renewed_map = HashMap::new(); + for op in ops { + match op { + IndexOperation::Insert { extrinsic, offset } => { + index_map.insert(extrinsic, offset); + } + IndexOperation::Renew { extrinsic, hash, .. } => { + renewed_map.insert(extrinsic, DbHash::from_slice(hash.as_ref())); + } + } + } + for (index, extrinsic) in body.into_iter().enumerate() { + let extrinsic = extrinsic.encode(); + let extrinsic_header = if let Some(hash) = renewed_map.get(&(index as u32)) { + // Bump ref counter + transaction.reference(columns::TRANSACTION, DbHash::from_slice(hash.as_ref())); + ExtrinsicHeader { + indexed_hash: hash.clone(), + data: extrinsic, + } + } else { + match index_map.get(&(index as u32)) { + Some(offset) if *offset as usize <= extrinsic.len() => { + let offset = *offset as usize; + let hash = HashFor::<Block>::hash(&extrinsic[offset..]); + transaction.store( + columns::TRANSACTION, + DbHash::from_slice(hash.as_ref()), + extrinsic[offset..].to_vec(), + ); + ExtrinsicHeader { + indexed_hash: DbHash::from_slice(hash.as_ref()), + data: extrinsic[..offset].to_vec(), + } + }, + _ => { + ExtrinsicHeader { + indexed_hash: Default::default(), + data: extrinsic, + } + } + } + }; + extrinsic_headers.push(extrinsic_header); + } + debug!( + target: "db", + "DB transaction index: {} inserted, {} renewed", + index_map.len(), + renewed_map.len() + ); + extrinsic_headers.encode() +} + impl<Block> sc_client_api::backend::AuxStore for Backend<Block> where Block: BlockT { fn insert_aux< 'a, @@ -1609,6 +1719,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> { finalized_blocks: Vec::new(), set_head: None, commit_state: false, + index_ops: Default::default(), }) } @@ -1998,7 +2109,7 @@ pub(crate) mod tests { changes: Option<Vec<(Vec<u8>, Vec<u8>)>>, extrinsics_root: H256, ) -> H256 { - insert_block(backend, number, parent_hash, changes, extrinsics_root, Vec::new()) + insert_block(backend, number, parent_hash, changes, extrinsics_root, Vec::new(), None) } pub fn insert_block( @@ -2008,6 +2119,7 @@ pub(crate) mod tests { changes: Option<Vec<(Vec<u8>, Vec<u8>)>>, extrinsics_root: H256, body: Vec<ExtrinsicWrapper<u64>>, + transaction_index: Option<Vec<IndexOperation>>, ) -> H256 { use sp_runtime::testing::Digest; @@ -2035,6 +2147,9 @@ pub(crate) mod tests { let mut op = backend.begin_operation().unwrap(); backend.begin_state_operation(&mut op, block_id).unwrap(); op.set_block_data(header, Some(body), None, NewBlockState::Best).unwrap(); + if let Some(index) = transaction_index { + op.update_transaction_index(index).unwrap(); + } op.update_changes_trie((changes_trie_update, ChangesTrieCacheAction::Clear)).unwrap(); backend.commit_operation(op).unwrap(); @@ -2676,7 +2791,7 @@ pub(crate) mod tests { let mut blocks = Vec::new(); let mut prev_hash = Default::default(); for i in 0 .. 5 { - let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()]); + let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()], None); blocks.push(hash); prev_hash = hash; } @@ -2697,4 +2812,100 @@ pub(crate) mod tests { assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap()); } } + + #[test] + fn prune_blocks_on_finalize_with_fork() { + let backend = Backend::<Block>::new_test_with_tx_storage( + 2, + 10, + TransactionStorageMode::StorageChain + ); + let mut blocks = Vec::new(); + let mut prev_hash = Default::default(); + for i in 0 .. 5 { + let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()], None); + blocks.push(hash); + prev_hash = hash; + } + + // insert a fork at block 2 + let fork_hash_root = insert_block( + &backend, + 2, + blocks[1], + None, + sp_core::H256::random(), + vec![2.into()], + None + ); + insert_block(&backend, 3, fork_hash_root, None, H256::random(), vec![3.into(), 11.into()], None); + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap(); + op.mark_head(BlockId::Hash(blocks[4])).unwrap(); + backend.commit_operation(op).unwrap(); + + for i in 1 .. 5 { + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap(); + op.mark_finalized(BlockId::Hash(blocks[i]), None).unwrap(); + backend.commit_operation(op).unwrap(); + } + + let bc = backend.blockchain(); + assert_eq!(None, bc.body(BlockId::hash(blocks[0])).unwrap()); + assert_eq!(None, bc.body(BlockId::hash(blocks[1])).unwrap()); + assert_eq!(None, bc.body(BlockId::hash(blocks[2])).unwrap()); + assert_eq!(Some(vec![3.into()]), bc.body(BlockId::hash(blocks[3])).unwrap()); + assert_eq!(Some(vec![4.into()]), bc.body(BlockId::hash(blocks[4])).unwrap()); + } + + #[test] + fn renew_transaction_storage() { + let backend = Backend::<Block>::new_test_with_tx_storage( + 2, + 10, + TransactionStorageMode::StorageChain + ); + let mut blocks = Vec::new(); + let mut prev_hash = Default::default(); + let x1 = ExtrinsicWrapper::from(0u64).encode(); + let x1_hash = <HashFor::<Block> as sp_core::Hasher>::hash(&x1[1..]); + for i in 0 .. 10 { + let mut index = Vec::new(); + if i == 0 { + index.push(IndexOperation::Insert { extrinsic: 0, offset: 1 }); + } else if i < 5 { + // keep renewing 1st + index.push(IndexOperation::Renew { + extrinsic: 0, + hash: x1_hash.as_ref().to_vec(), + size: (x1.len() - 1) as u32, + }); + } // else stop renewing + let hash = insert_block( + &backend, + i, + prev_hash, + None, + Default::default(), + vec![i.into()], + Some(index) + ); + blocks.push(hash); + prev_hash = hash; + } + + for i in 1 .. 10 { + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, BlockId::Hash(blocks[4])).unwrap(); + op.mark_finalized(BlockId::Hash(blocks[i]), None).unwrap(); + backend.commit_operation(op).unwrap(); + let bc = backend.blockchain(); + if i < 6 { + assert!(bc.indexed_transaction(&x1_hash).unwrap().is_some()); + } else { + assert!(bc.indexed_transaction(&x1_hash).unwrap().is_none()); + } + } + } } diff --git a/substrate/client/db/src/light.rs b/substrate/client/db/src/light.rs index 91f37dd374d..bf24197c5b5 100644 --- a/substrate/client/db/src/light.rs +++ b/substrate/client/db/src/light.rs @@ -756,7 +756,7 @@ pub(crate) mod tests { #[test] fn finalized_ancient_headers_are_replaced_with_cht() { fn insert_headers<F: Fn(&Hash, u64) -> Header>(header_producer: F) -> - (Arc<sp_database::MemDb<DbHash>>, LightStorage<Block>) + (Arc<sp_database::MemDb>, LightStorage<Block>) { let raw_db = Arc::new(sp_database::MemDb::default()); let db = LightStorage::from_kvdb(raw_db.clone()).unwrap(); diff --git a/substrate/client/db/src/parity_db.rs b/substrate/client/db/src/parity_db.rs index 71cc5117f19..ed39c1e9f66 100644 --- a/substrate/client/db/src/parity_db.rs +++ b/substrate/client/db/src/parity_db.rs @@ -33,7 +33,7 @@ fn handle_err<T>(result: parity_db::Result<T>) -> T { } /// Wrap parity-db database into a trait object that implements `sp_database::Database` -pub fn open<H: Clone>(path: &std::path::Path, db_type: DatabaseType) +pub fn open<H: Clone + AsRef<[u8]>>(path: &std::path::Path, db_type: DatabaseType) -> parity_db::Result<std::sync::Arc<dyn Database<H>>> { let mut config = parity_db::Options::with_columns(path, NUM_COLUMNS as u8); @@ -48,7 +48,7 @@ pub fn open<H: Clone>(path: &std::path::Path, db_type: DatabaseType) Ok(std::sync::Arc::new(DbAdapter(db))) } -impl<H: Clone> Database<H> for DbAdapter { +impl<H: Clone + AsRef<[u8]>> Database<H> for DbAdapter { fn commit(&self, transaction: Transaction<H>) -> Result<(), DatabaseError> { handle_err(self.0.commit(transaction.0.into_iter().map(|change| match change { @@ -65,7 +65,11 @@ impl<H: Clone> Database<H> for DbAdapter { handle_err(self.0.get(col as u8, key)) } - fn lookup(&self, _hash: &H) -> Option<Vec<u8>> { - unimplemented!(); + fn contains(&self, col: ColumnId, key: &[u8]) -> bool { + handle_err(self.0.get_size(col as u8, key)).is_some() + } + + fn value_size(&self, col: ColumnId, key: &[u8]) -> Option<usize> { + handle_err(self.0.get_size(col as u8, key)).map(|s| s as usize) } } diff --git a/substrate/client/db/src/utils.rs b/substrate/client/db/src/utils.rs index cd9b2a6f56d..590b994d50e 100644 --- a/substrate/client/db/src/utils.rs +++ b/substrate/client/db/src/utils.rs @@ -278,7 +278,7 @@ pub fn open_database<Block: BlockT>( #[cfg(feature = "with-parity-db")] DatabaseSettingsSrc::ParityDb { path } => { crate::parity_db::open(&path, db_type) - .map_err(|e| sp_blockchain::Error::Backend(format!("{:?}", e)))? + .map_err(|e| sp_blockchain::Error::Backend(format!("{}", e)))? }, #[cfg(not(feature = "with-parity-db"))] DatabaseSettingsSrc::ParityDb { .. } => { @@ -449,10 +449,35 @@ impl DatabaseType { } } +pub(crate) struct JoinInput<'a, 'b>(&'a [u8], &'b [u8]); + +pub(crate) fn join_input<'a, 'b>(i1: &'a[u8], i2: &'b [u8]) -> JoinInput<'a, 'b> { + JoinInput(i1, i2) +} + +impl<'a, 'b> codec::Input for JoinInput<'a, 'b> { + fn remaining_len(&mut self) -> Result<Option<usize>, codec::Error> { + Ok(Some(self.0.len() + self.1.len())) + } + + fn read(&mut self, into: &mut [u8]) -> Result<(), codec::Error> { + let mut read = 0; + if self.0.len() > 0 { + read = std::cmp::min(self.0.len(), into.len()); + self.0.read(&mut into[..read])?; + } + if read < into.len() { + self.1.read(&mut into[read..])?; + } + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper}; + use codec::Input; type Block = RawBlock<ExtrinsicWrapper<u32>>; #[test] @@ -469,4 +494,25 @@ mod tests { assert_eq!(DatabaseType::Full.as_str(), "full"); assert_eq!(DatabaseType::Light.as_str(), "light"); } + + #[test] + fn join_input_works() { + let buf1 = [1, 2, 3, 4]; + let buf2 = [5, 6, 7, 8]; + let mut test = [0, 0, 0]; + let mut joined = join_input(buf1.as_ref(), buf2.as_ref()); + assert_eq!(joined.remaining_len().unwrap(), Some(8)); + + joined.read(&mut test).unwrap(); + assert_eq!(test, [1, 2, 3]); + assert_eq!(joined.remaining_len().unwrap(), Some(5)); + + joined.read(&mut test).unwrap(); + assert_eq!(test, [4, 5, 6]); + assert_eq!(joined.remaining_len().unwrap(), Some(2)); + + joined.read(&mut test[0..2]).unwrap(); + assert_eq!(test, [7, 8, 6]); + assert_eq!(joined.remaining_len().unwrap(), Some(0)); + } } diff --git a/substrate/client/light/src/backend.rs b/substrate/client/light/src/backend.rs index 52ace4fd947..621ada13ff6 100644 --- a/substrate/client/light/src/backend.rs +++ b/substrate/client/light/src/backend.rs @@ -30,7 +30,7 @@ use sp_core::storage::{well_known_keys, ChildInfo}; use sp_core::offchain::storage::InMemOffchainStorage; use sp_state_machine::{ Backend as StateBackend, TrieBackend, InMemoryBackend, ChangesTrieTransaction, - StorageCollection, ChildStorageCollection, + StorageCollection, ChildStorageCollection, IndexOperation, }; use sp_runtime::{generic::BlockId, Justification, Justifications, Storage}; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero, Header, HashFor}; @@ -374,6 +374,11 @@ impl<S, Block> BlockImportOperation<Block> for ImportOperation<Block, S> self.set_head = Some(block); Ok(()) } + + fn update_transaction_index(&mut self, _index: Vec<IndexOperation>) -> sp_blockchain::Result<()> { + // noop for the light client + Ok(()) + } } impl<H: Hasher> std::fmt::Debug for GenesisOrUnavailableState<H> { diff --git a/substrate/client/light/src/blockchain.rs b/substrate/client/light/src/blockchain.rs index 062b3a9866d..3349adf7ac6 100644 --- a/substrate/client/light/src/blockchain.rs +++ b/substrate/client/light/src/blockchain.rs @@ -129,10 +129,10 @@ impl<S, Block> BlockchainBackend<Block> for Blockchain<S> where Block: BlockT, S Err(ClientError::NotAvailableOnLightClient) } - fn extrinsic( + fn indexed_transaction( &self, _hash: &Block::Hash, - ) -> ClientResult<Option<<Block as BlockT>::Extrinsic>> { + ) -> ClientResult<Option<Vec<u8>>> { Err(ClientError::NotAvailableOnLightClient) } } diff --git a/substrate/client/network/src/bitswap.rs b/substrate/client/network/src/bitswap.rs index 7129f3dbe07..aea2b8420cb 100644 --- a/substrate/client/network/src/bitswap.rs +++ b/substrate/client/network/src/bitswap.rs @@ -25,7 +25,6 @@ use std::io; use std::sync::Arc; use std::task::{Context, Poll}; use cid::Version; -use codec::Encode; use core::pin::Pin; use futures::Future; use futures::io::{AsyncRead, AsyncWrite}; @@ -257,15 +256,15 @@ impl<B: BlockT> NetworkBehaviour for Bitswap<B> { } let mut hash = B::Hash::default(); hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]); - let extrinsic = match self.client.extrinsic(&hash) { + let transaction = match self.client.indexed_transaction(&hash) { Ok(ex) => ex, Err(e) => { - error!(target: LOG_TARGET, "Error retrieving extrinsic {}: {}", hash, e); + error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e); None } }; - match extrinsic { - Some(extrinsic) => { + match transaction { + Some(transaction) => { trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash); if entry.want_type == WantType::Block as i32 { let prefix = Prefix { @@ -276,7 +275,7 @@ impl<B: BlockT> NetworkBehaviour for Bitswap<B> { }; response.payload.push(MessageBlock { prefix: prefix.to_bytes(), - data: extrinsic.encode(), + data: transaction, }); } else { response.block_presences.push(BlockPresence { diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 81c98b8b1e2..a39c4566419 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -767,6 +767,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where offchain_sc, tx, _, changes_trie_tx, + tx_index, ) = storage_changes.into_inner(); if self.config.offchain_indexing_api { @@ -775,6 +776,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where operation.op.update_db_storage(tx)?; operation.op.update_storage(main_sc.clone(), child_sc.clone())?; + operation.op.update_transaction_index(tx_index)?; if let Some(changes_trie_transaction) = changes_trie_tx { operation.op.update_changes_trie(changes_trie_transaction)?; @@ -1945,12 +1947,12 @@ impl<B, E, Block, RA> BlockBackend<Block> for Client<B, E, Block, RA> self.backend.blockchain().hash(number) } - fn extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result<Option<Block::Extrinsic>> { - self.backend.blockchain().extrinsic(hash) + fn indexed_transaction(&self, hash: &Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> { + self.backend.blockchain().indexed_transaction(hash) } - fn have_extrinsic(&self, hash: &Block::Hash) -> sp_blockchain::Result<bool> { - self.backend.blockchain().have_extrinsic(hash) + fn has_indexed_transaction(&self, hash: &Block::Hash) -> sp_blockchain::Result<bool> { + self.backend.blockchain().has_indexed_transaction(hash) } } diff --git a/substrate/primitives/blockchain/src/backend.rs b/substrate/primitives/blockchain/src/backend.rs index 6ee836acb64..b00cbada9f4 100644 --- a/substrate/primitives/blockchain/src/backend.rs +++ b/substrate/primitives/blockchain/src/backend.rs @@ -216,15 +216,16 @@ pub trait Backend<Block: BlockT>: HeaderBackend<Block> + HeaderMetadata<Block, E Ok(None) } - /// Get single extrinsic by hash. - fn extrinsic( + /// Get single indexed transaction by content hash. Note that this will only fetch transactions + /// that are indexed by the runtime with `storage_index_transaction`. + fn indexed_transaction( &self, hash: &Block::Hash, - ) -> Result<Option<<Block as BlockT>::Extrinsic>>; + ) -> Result<Option<Vec<u8>>>; - /// Check if extrinsic exists. - fn have_extrinsic(&self, hash: &Block::Hash) -> Result<bool> { - Ok(self.extrinsic(hash)?.is_some()) + /// Check if indexed transaction exists. + fn has_indexed_transaction(&self, hash: &Block::Hash) -> Result<bool> { + Ok(self.indexed_transaction(hash)?.is_some()) } } diff --git a/substrate/primitives/database/Cargo.toml b/substrate/primitives/database/Cargo.toml index 4062ba29235..aae7668b5ec 100644 --- a/substrate/primitives/database/Cargo.toml +++ b/substrate/primitives/database/Cargo.toml @@ -13,3 +13,4 @@ readme = "README.md" [dependencies] parking_lot = "0.11.1" kvdb = "0.9.0" + diff --git a/substrate/primitives/database/src/kvdb.rs b/substrate/primitives/database/src/kvdb.rs index b50ca53786f..d99fe6360ef 100644 --- a/substrate/primitives/database/src/kvdb.rs +++ b/substrate/primitives/database/src/kvdb.rs @@ -33,18 +33,73 @@ fn handle_err<T>(result: std::io::Result<T>) -> T { } /// Wrap RocksDb database into a trait object that implements `sp_database::Database` -pub fn as_database<D: KeyValueDB + 'static, H: Clone>(db: D) -> std::sync::Arc<dyn Database<H>> { +pub fn as_database<D, H>(db: D) -> std::sync::Arc<dyn Database<H>> + where D: KeyValueDB + 'static, H: Clone + AsRef<[u8]> +{ std::sync::Arc::new(DbAdapter(db)) } -impl<D: KeyValueDB, H: Clone> Database<H> for DbAdapter<D> { +impl <D: KeyValueDB> DbAdapter<D> { + // Returns counter key and counter value if it exists. + fn read_counter(&self, col: ColumnId, key: &[u8]) -> error::Result<(Vec<u8>, Option<u32>)> { + // Add a key suffix for the counter + let mut counter_key = key.to_vec(); + counter_key.push(0); + Ok(match self.0.get(col, &counter_key).map_err(|e| error::DatabaseError(Box::new(e)))? { + Some(data) => { + let mut counter_data = [0; 4]; + if data.len() != 4 { + return Err(error::DatabaseError(Box::new( + std::io::Error::new(std::io::ErrorKind::Other, + format!("Unexpected counter len {}", data.len()))) + )) + } + counter_data.copy_from_slice(&data); + let counter = u32::from_le_bytes(counter_data); + (counter_key, Some(counter)) + }, + None => (counter_key, None) + }) + } +} + +impl<D: KeyValueDB, H: Clone + AsRef<[u8]>> Database<H> for DbAdapter<D> { fn commit(&self, transaction: Transaction<H>) -> error::Result<()> { let mut tx = DBTransaction::new(); for change in transaction.0.into_iter() { match change { Change::Set(col, key, value) => tx.put_vec(col, &key, value), Change::Remove(col, key) => tx.delete(col, &key), - _ => unimplemented!(), + Change::Store(col, key, value) => { + match self.read_counter(col, key.as_ref())? { + (counter_key, Some(mut counter)) => { + counter += 1; + tx.put(col, &counter_key, &counter.to_le_bytes()); + }, + (counter_key, None) => { + let d = 1u32.to_le_bytes(); + tx.put(col, &counter_key, &d); + tx.put_vec(col, key.as_ref(), value); + }, + } + } + Change::Reference(col, key) => { + if let (counter_key, Some(mut counter)) = self.read_counter(col, key.as_ref())? { + counter += 1; + tx.put(col, &counter_key, &counter.to_le_bytes()); + } + } + Change::Release(col, key) => { + if let (counter_key, Some(mut counter)) = self.read_counter(col, key.as_ref())? { + counter -= 1; + if counter == 0 { + tx.delete(col, &counter_key); + tx.delete(col, key.as_ref()); + } else { + tx.put(col, &counter_key, &counter.to_le_bytes()); + } + } + } } } self.0.write(tx).map_err(|e| error::DatabaseError(Box::new(e))) @@ -54,7 +109,7 @@ impl<D: KeyValueDB, H: Clone> Database<H> for DbAdapter<D> { handle_err(self.0.get(col, key)) } - fn lookup(&self, _hash: &H) -> Option<Vec<u8>> { - unimplemented!(); + fn contains(&self, col: ColumnId, key: &[u8]) -> bool { + handle_err(self.0.has_key(col, key)) } } diff --git a/substrate/primitives/database/src/lib.rs b/substrate/primitives/database/src/lib.rs index 7107ea25c02..1fa0c8e49b0 100644 --- a/substrate/primitives/database/src/lib.rs +++ b/substrate/primitives/database/src/lib.rs @@ -32,16 +32,9 @@ pub type ColumnId = u32; pub enum Change<H> { Set(ColumnId, Vec<u8>, Vec<u8>), Remove(ColumnId, Vec<u8>), - Store(H, Vec<u8>), - Release(H), -} - -/// An alteration to the database that references the data. -pub enum ChangeRef<'a, H> { - Set(ColumnId, &'a [u8], &'a [u8]), - Remove(ColumnId, &'a [u8]), - Store(H, &'a [u8]), - Release(H), + Store(ColumnId, H, Vec<u8>), + Reference(ColumnId, H), + Release(ColumnId, H), } /// A series of changes to the database that can be committed atomically. They do not take effect @@ -67,49 +60,27 @@ impl<H> Transaction<H> { self.0.push(Change::Remove(col, key.to_vec())) } /// Store the `preimage` of `hash` into the database, so that it may be looked up later with - /// `Database::lookup`. This may be called multiple times, but `Database::lookup` but subsequent + /// `Database::get`. This may be called multiple times, but subsequent /// calls will ignore `preimage` and simply increase the number of references on `hash`. - pub fn store(&mut self, hash: H, preimage: &[u8]) { - self.0.push(Change::Store(hash, preimage.to_vec())) + pub fn store(&mut self, col: ColumnId, hash: H, preimage: Vec<u8>) { + self.0.push(Change::Store(col, hash, preimage)) + } + /// Increase the number of references for `hash` in the database. + pub fn reference(&mut self, col: ColumnId, hash: H) { + self.0.push(Change::Reference(col, hash)) } /// Release the preimage of `hash` from the database. An equal number of these to the number of - /// corresponding `store`s must have been given before it is legal for `Database::lookup` to + /// corresponding `store`s must have been given before it is legal for `Database::get` to /// be unable to provide the preimage. - pub fn release(&mut self, hash: H) { - self.0.push(Change::Release(hash)) + pub fn release(&mut self, col: ColumnId, hash: H) { + self.0.push(Change::Release(col, hash)) } } -pub trait Database<H: Clone>: Send + Sync { +pub trait Database<H: Clone + AsRef<[u8]>>: Send + Sync { /// Commit the `transaction` to the database atomically. Any further calls to `get` or `lookup` /// will reflect the new state. - fn commit(&self, transaction: Transaction<H>) -> error::Result<()> { - for change in transaction.0.into_iter() { - match change { - Change::Set(col, key, value) => self.set(col, &key, &value), - Change::Remove(col, key) => self.remove(col, &key), - Change::Store(hash, preimage) => self.store(&hash, &preimage), - Change::Release(hash) => self.release(&hash), - }?; - } - - Ok(()) - } - - /// Commit the `transaction` to the database atomically. Any further calls to `get` or `lookup` - /// will reflect the new state. - fn commit_ref<'a>(&self, transaction: &mut dyn Iterator<Item=ChangeRef<'a, H>>) -> error::Result<()> { - let mut tx = Transaction::new(); - for change in transaction { - match change { - ChangeRef::Set(col, key, value) => tx.set(col, key, value), - ChangeRef::Remove(col, key) => tx.remove(col, key), - ChangeRef::Store(hash, preimage) => tx.store(hash, preimage), - ChangeRef::Release(hash) => tx.release(hash), - } - } - self.commit(tx) - } + fn commit(&self, transaction: Transaction<H>) -> error::Result<()>; /// Retrieve the value previously stored against `key` or `None` if /// `key` is not currently in the database. @@ -120,6 +91,11 @@ pub trait Database<H: Clone>: Send + Sync { self.get(col, key).is_some() } + /// Check value size in the database possibly without retrieving it. + fn value_size(&self, col: ColumnId, key: &[u8]) -> Option<usize> { + self.get(col, key).map(|v| v.len()) + } + /// Call `f` with the value previously stored against `key`. /// /// This may be faster than `get` since it doesn't allocate. @@ -127,50 +103,6 @@ pub trait Database<H: Clone>: Send + Sync { fn with_get(&self, col: ColumnId, key: &[u8], f: &mut dyn FnMut(&[u8])) { self.get(col, key).map(|v| f(&v)); } - - /// Set the value of `key` in `col` to `value`, replacing anything that is there currently. - fn set(&self, col: ColumnId, key: &[u8], value: &[u8]) -> error::Result<()> { - let mut t = Transaction::new(); - t.set(col, key, value); - self.commit(t) - } - /// Remove the value of `key` in `col`. - fn remove(&self, col: ColumnId, key: &[u8]) -> error::Result<()> { - let mut t = Transaction::new(); - t.remove(col, key); - self.commit(t) - } - - /// Retrieve the first preimage previously `store`d for `hash` or `None` if no preimage is - /// currently stored. - fn lookup(&self, hash: &H) -> Option<Vec<u8>>; - - /// Call `f` with the preimage stored for `hash` and return the result, or `None` if no preimage - /// is currently stored. - /// - /// This may be faster than `lookup` since it doesn't allocate. - /// Use `with_lookup` helper function if you need `f` to return a value from `f` - fn with_lookup(&self, hash: &H, f: &mut dyn FnMut(&[u8])) { - self.lookup(hash).map(|v| f(&v)); - } - - /// Store the `preimage` of `hash` into the database, so that it may be looked up later with - /// `Database::lookup`. This may be called multiple times, but `Database::lookup` but subsequent - /// calls will ignore `preimage` and simply increase the number of references on `hash`. - fn store(&self, hash: &H, preimage: &[u8]) -> error::Result<()> { - let mut t = Transaction::new(); - t.store(hash.clone(), preimage); - self.commit(t) - } - - /// Release the preimage of `hash` from the database. An equal number of these to the number of - /// corresponding `store`s must have been given before it is legal for `Database::lookup` to - /// be unable to provide the preimage. - fn release(&self, hash: &H) -> error::Result<()> { - let mut t = Transaction::new(); - t.release(hash.clone()); - self.commit(t) - } } impl<H> std::fmt::Debug for dyn Database<H> { @@ -183,20 +115,13 @@ impl<H> std::fmt::Debug for dyn Database<H> { /// `key` is not currently in the database. /// /// This may be faster than `get` since it doesn't allocate. -pub fn with_get<R, H: Clone>(db: &dyn Database<H>, col: ColumnId, key: &[u8], mut f: impl FnMut(&[u8]) -> R) -> Option<R> { +pub fn with_get<R, H: Clone + AsRef<[u8]>>( + db: &dyn Database<H>, + col: ColumnId, + key: &[u8], mut f: impl FnMut(&[u8]) -> R +) -> Option<R> { let mut result: Option<R> = None; let mut adapter = |k: &_| { result = Some(f(k)); }; db.with_get(col, key, &mut adapter); result } - -/// Call `f` with the preimage stored for `hash` and return the result, or `None` if no preimage -/// is currently stored. -/// -/// This may be faster than `lookup` since it doesn't allocate. -pub fn with_lookup<R, H: Clone>(db: &dyn Database<H>, hash: &H, mut f: impl FnMut(&[u8]) -> R) -> Option<R> { - let mut result: Option<R> = None; - let mut adapter = |k: &_| { result = Some(f(k)); }; - db.with_lookup(hash, &mut adapter); - result -} diff --git a/substrate/primitives/database/src/mem.rs b/substrate/primitives/database/src/mem.rs index 41af2e2f235..24ddf033197 100644 --- a/substrate/primitives/database/src/mem.rs +++ b/substrate/primitives/database/src/mem.rs @@ -17,26 +17,41 @@ //! In-memory implementation of `Database` -use std::collections::HashMap; +use std::collections::{HashMap, hash_map::Entry}; use crate::{Database, Change, ColumnId, Transaction, error}; use parking_lot::RwLock; #[derive(Default)] /// This implements `Database` as an in-memory hash map. `commit` is not atomic. -pub struct MemDb<H: Clone + Send + Sync + Eq + PartialEq + Default + std::hash::Hash> - (RwLock<(HashMap<ColumnId, HashMap<Vec<u8>, Vec<u8>>>, HashMap<H, Vec<u8>>)>); +pub struct MemDb(RwLock<HashMap<ColumnId, HashMap<Vec<u8>, (u32, Vec<u8>)>>>); -impl<H> Database<H> for MemDb<H> - where H: Clone + Send + Sync + Eq + PartialEq + Default + std::hash::Hash +impl<H> Database<H> for MemDb + where H: Clone + AsRef<[u8]> { fn commit(&self, transaction: Transaction<H>) -> error::Result<()> { let mut s = self.0.write(); for change in transaction.0.into_iter() { match change { - Change::Set(col, key, value) => { s.0.entry(col).or_default().insert(key, value); }, - Change::Remove(col, key) => { s.0.entry(col).or_default().remove(&key); }, - Change::Store(hash, preimage) => { s.1.insert(hash, preimage); }, - Change::Release(hash) => { s.1.remove(&hash); }, + Change::Set(col, key, value) => { s.entry(col).or_default().insert(key, (1, value)); }, + Change::Remove(col, key) => { s.entry(col).or_default().remove(&key); }, + Change::Store(col, hash, value) => { + s.entry(col).or_default().entry(hash.as_ref().to_vec()) + .and_modify(|(c, _)| *c += 1) + .or_insert_with(|| (1, value)); + }, + Change::Reference(col, hash) => { + if let Entry::Occupied(mut entry) = s.entry(col).or_default().entry(hash.as_ref().to_vec()) { + entry.get_mut().0 += 1; + } + } + Change::Release(col, hash) => { + if let Entry::Occupied(mut entry) = s.entry(col).or_default().entry(hash.as_ref().to_vec()) { + entry.get_mut().0 -= 1; + if entry.get().0 == 0 { + entry.remove(); + } + } + } } } @@ -45,18 +60,11 @@ impl<H> Database<H> for MemDb<H> fn get(&self, col: ColumnId, key: &[u8]) -> Option<Vec<u8>> { let s = self.0.read(); - s.0.get(&col).and_then(|c| c.get(key).cloned()) - } - - fn lookup(&self, hash: &H) -> Option<Vec<u8>> { - let s = self.0.read(); - s.1.get(hash).cloned() + s.get(&col).and_then(|c| c.get(key).map(|(_, v)| v.clone())) } } -impl<H> MemDb<H> - where H: Clone + Send + Sync + Eq + PartialEq + Default + std::hash::Hash -{ +impl MemDb { /// Create a new instance pub fn new() -> Self { MemDb::default() @@ -65,7 +73,7 @@ impl<H> MemDb<H> /// Count number of values in a column pub fn count(&self, col: ColumnId) -> usize { let s = self.0.read(); - s.0.get(&col).map(|c| c.len()).unwrap_or(0) + s.get(&col).map(|c| c.len()).unwrap_or(0) } } diff --git a/substrate/primitives/externalities/src/lib.rs b/substrate/primitives/externalities/src/lib.rs index 3ee37f5e31b..1077f41048d 100644 --- a/substrate/primitives/externalities/src/lib.rs +++ b/substrate/primitives/externalities/src/lib.rs @@ -228,6 +228,16 @@ pub trait Externalities: ExtensionStore { /// no transaction is open that can be closed. fn storage_commit_transaction(&mut self) -> Result<(), ()>; + /// Index specified transaction slice and store it. + fn storage_index_transaction(&mut self, _index: u32, _offset: u32) { + unimplemented!("storage_index_transaction"); + } + + /// Renew existing piece of transaction storage. + fn storage_renew_transaction_index(&mut self, _index: u32, _hash: &[u8], _size: u32) { + unimplemented!("storage_renew_transaction_index"); + } + /// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! /// Benchmarking related functionality and shouldn't be used anywhere else! /// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! diff --git a/substrate/primitives/state-machine/src/ext.rs b/substrate/primitives/state-machine/src/ext.rs index 7907cda6fb4..65b7b638a9a 100644 --- a/substrate/primitives/state-machine/src/ext.rs +++ b/substrate/primitives/state-machine/src/ext.rs @@ -18,7 +18,7 @@ //! Concrete externalities implementation. use crate::{ - StorageKey, StorageValue, OverlayedChanges, + StorageKey, StorageValue, OverlayedChanges, IndexOperation, backend::Backend, overlayed_changes::OverlayedExtensions, }; use hash_db::Hasher; @@ -568,6 +568,36 @@ where } } + fn storage_index_transaction(&mut self, index: u32, offset: u32) { + trace!( + target: "state", + "{:04x}: IndexTransaction ({}): [{}..]", + self.id, + index, + offset, + ); + self.overlay.add_transaction_index(IndexOperation::Insert { + extrinsic: index, + offset, + }); + } + + /// Renew existing piece of data storage. + fn storage_renew_transaction_index(&mut self, index: u32, hash: &[u8], size: u32) { + trace!( + target: "state", + "{:04x}: RenewTransactionIndex ({}) {} bytes", + self.id, + HexDisplay::from(&hash), + size, + ); + self.overlay.add_transaction_index(IndexOperation::Renew { + extrinsic: index, + hash: hash.to_vec(), + size + }); + } + #[cfg(not(feature = "std"))] fn storage_changes_root(&mut self, _parent_hash: &[u8]) -> Result<Option<Vec<u8>>, ()> { Ok(None) diff --git a/substrate/primitives/state-machine/src/lib.rs b/substrate/primitives/state-machine/src/lib.rs index 0167633d480..0a664840df8 100644 --- a/substrate/primitives/state-machine/src/lib.rs +++ b/substrate/primitives/state-machine/src/lib.rs @@ -121,6 +121,7 @@ pub use crate::overlayed_changes::{ StorageChanges, StorageTransactionCache, OffchainChangesCollection, OffchainOverlayedChanges, + IndexOperation, }; pub use crate::backend::Backend; pub use crate::trie_backend_essence::{TrieBackendStorage, Storage}; diff --git a/substrate/primitives/state-machine/src/overlayed_changes/mod.rs b/substrate/primitives/state-machine/src/overlayed_changes/mod.rs index 285bf2a73a1..1d3cbb59ba0 100644 --- a/substrate/primitives/state-machine/src/overlayed_changes/mod.rs +++ b/substrate/primitives/state-machine/src/overlayed_changes/mod.rs @@ -103,12 +103,35 @@ pub struct OverlayedChanges { children: Map<StorageKey, (OverlayedChangeSet, ChildInfo)>, /// Offchain related changes. offchain: OffchainOverlayedChanges, + /// Transaction index changes, + transaction_index_ops: Vec<IndexOperation>, /// True if extrinsics stats must be collected. collect_extrinsics: bool, /// Collect statistic on this execution. stats: StateMachineStats, } +/// Transcation index operation. +#[derive(Debug, Clone)] +pub enum IndexOperation { + /// Insert transaction into index. + Insert { + /// Extrinsic index in the current block. + extrinsic: u32, + /// Data offset in the extrinsic. + offset: u32, + }, + /// Renew existing transaction storage. + Renew { + /// Extrinsic index in the current block. + extrinsic: u32, + /// Referenced index hash. + hash: Vec<u8>, + /// Expected data size. + size: u32, + } +} + /// A storage changes structure that can be generated by the data collected in [`OverlayedChanges`]. /// /// This contains all the changes to the storage and transactions to apply theses changes to the @@ -137,6 +160,10 @@ pub struct StorageChanges<Transaction, H: Hasher, N: BlockNumber> { /// Phantom data for block number until change trie support no_std. #[cfg(not(feature = "std"))] pub _ph: sp_std::marker::PhantomData<N>, + + /// Changes to the transaction index, + #[cfg(feature = "std")] + pub transaction_index_changes: Vec<IndexOperation>, } #[cfg(feature = "std")] @@ -149,6 +176,7 @@ impl<Transaction, H: Hasher, N: BlockNumber> StorageChanges<Transaction, H, N> { Transaction, H::Out, Option<ChangesTrieTransaction<H, N>>, + Vec<IndexOperation>, ) { ( self.main_storage_changes, @@ -157,6 +185,7 @@ impl<Transaction, H: Hasher, N: BlockNumber> StorageChanges<Transaction, H, N> { self.transaction, self.transaction_storage_root, self.changes_trie_transaction, + self.transaction_index_changes, ) } } @@ -214,6 +243,8 @@ impl<Transaction: Default, H: Hasher, N: BlockNumber> Default for StorageChanges changes_trie_transaction: None, #[cfg(not(feature = "std"))] _ph: Default::default(), + #[cfg(feature = "std")] + transaction_index_changes: Default::default(), } } } @@ -543,6 +574,9 @@ impl OverlayedChanges { let (main_storage_changes, child_storage_changes) = self.drain_committed(); let offchain_storage_changes = self.offchain_drain_committed().collect(); + #[cfg(feature = "std")] + let transaction_index_changes = std::mem::take(&mut self.transaction_index_ops); + Ok(StorageChanges { main_storage_changes: main_storage_changes.collect(), child_storage_changes: child_storage_changes.map(|(sk, it)| (sk, it.0.collect())).collect(), @@ -551,6 +585,8 @@ impl OverlayedChanges { transaction_storage_root, #[cfg(feature = "std")] changes_trie_transaction, + #[cfg(feature = "std")] + transaction_index_changes, #[cfg(not(feature = "std"))] _ph: Default::default(), }) @@ -666,6 +702,11 @@ impl OverlayedChanges { None => self.offchain.remove(STORAGE_PREFIX, key), } } + + /// Add transaction index operation. + pub fn add_transaction_index(&mut self, op: IndexOperation) { + self.transaction_index_ops.push(op) + } } #[cfg(feature = "std")] -- GitLab