Skip to content
lib.rs 82 KiB
Newer Older
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// Substrate is free software: you can redistribute it and/or modify
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
//! Client backend that uses RocksDB database as storage.
//!
//! # Canonicality vs. Finality
//!
//! Finality indicates that a block will not be reverted, according to the consensus algorithm,
//! while canonicality indicates that the block may be reverted, but we will be unable to do so,
//! having discarded heavy state that will allow a chain reorganization.
//!
//! Finality implies canonicality but not vice-versa.
pub mod light;
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
use std::sync::Arc;
use std::path::PathBuf;
use std::collections::{HashMap, HashSet};
use client_api::ForkBlocks;
use client_api::backend::NewBlockState;
use client_api::backend::{StorageCollection, ChildStorageCollection};
use client_api::blockchain::{well_known_cache_keys, HeaderBackend};
use client_api::error::{Result as ClientResult, Error as ClientError};
use client_api::execution_extensions::ExecutionExtensions;
use codec::{Decode, Encode};
cheme's avatar
cheme committed
use hash_db::{Hasher, Prefix};
use kvdb::{KeyValueDB, DBTransaction};
use trie::{MemoryDB, PrefixedMemoryDB, prefixed_key};
use parking_lot::{Mutex, RwLock};
use primitives::{H256, Blake2Hasher, ChangesTrieConfiguration, convert_hash, traits::CodeExecutor};
use primitives::storage::well_known_keys;
use sr_primitives::{
	generic::{BlockId, DigestItem}, Justification, StorageOverlay, ChildrenStorageOverlay,
use sr_primitives::traits::{
	Block as BlockT, Header as HeaderT, NumberFor, Zero, One, SaturatedConversion
Gavin Wood's avatar
Gavin Wood committed
};
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
use executor::RuntimeInfo;
use state_machine::{
	DBValue, ChangesTrieTransaction, ChangesTrieCacheAction, ChangesTrieBuildCache,
	backend::Backend as StateBackend,
};
use crate::utils::{Meta, db_err, meta_keys, read_db, read_meta};
use client::leaves::{LeafSet, FinalizationDisplaced};
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
use state_db::StateDb;
use header_metadata::{CachedHeaderMetadata, HeaderMetadata, HeaderMetadataCache};
use crate::storage_cache::{CachingState, SharedCache, new_shared_cache};
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
pub use state_db::PruningMode;

#[cfg(feature = "test-helpers")]
use client::in_mem::Backend as InMemoryBackend;

const CANONICALIZATION_DELAY: u64 = 4096;
const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u32 = 32768;
/// Default value for storage cache child ratio.
const DEFAULT_CHILD_RATIO: (usize, usize) = (1, 10);

/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
pub type DbState = state_machine::TrieBackend<Arc<dyn state_machine::Storage<Blake2Hasher>>, Blake2Hasher>;
/// Re-export the KVDB trait so that one can pass an implementation of it.
pub use kvdb;

/// A reference tracking state.
///
/// It makes sure that the hash we are using stays pinned in storage
/// until this structure is dropped.
pub struct RefTrackingState<Block: BlockT> {
	state: DbState,
	storage: Arc<StorageDb<Block>>,
	parent_hash: Option<Block::Hash>,
}

impl<B: BlockT> RefTrackingState<B> {
	fn new(state: DbState, storage: Arc<StorageDb<B>>, parent_hash: Option<B::Hash>) -> RefTrackingState<B> {
		RefTrackingState {
			state,
			parent_hash,
			storage,
		}
	}
}

impl<B: BlockT> Drop for RefTrackingState<B> {
	fn drop(&mut self) {
		if let Some(hash) = &self.parent_hash {
			self.storage.state_db.unpin(hash);
		}
	}
}

impl<Block: BlockT> std::fmt::Debug for RefTrackingState<Block> {
	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
		write!(f, "Block {:?}", self.parent_hash)
	}
}

impl<B: BlockT> StateBackend<Blake2Hasher> for RefTrackingState<B> {
	type Error =  <DbState as StateBackend<Blake2Hasher>>::Error;
	type Transaction = <DbState as StateBackend<Blake2Hasher>>::Transaction;
	type TrieBackendStorage = <DbState as StateBackend<Blake2Hasher>>::TrieBackendStorage;

	fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
		self.state.storage(key)
	}

	fn storage_hash(&self, key: &[u8]) -> Result<Option<H256>, Self::Error> {
		self.state.storage_hash(key)
	}

	fn child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
		self.state.child_storage(storage_key, key)
	}

	fn exists_storage(&self, key: &[u8]) -> Result<bool, Self::Error> {
		self.state.exists_storage(key)
	}

	fn exists_child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result<bool, Self::Error> {
		self.state.exists_child_storage(storage_key, key)
	}

	fn for_keys_with_prefix<F: FnMut(&[u8])>(&self, prefix: &[u8], f: F) {
		self.state.for_keys_with_prefix(prefix, f)
	}

	fn for_key_values_with_prefix<F: FnMut(&[u8], &[u8])>(&self, prefix: &[u8], f: F) {
		self.state.for_key_values_with_prefix(prefix, f)
	}

	fn for_keys_in_child_storage<F: FnMut(&[u8])>(&self, storage_key: &[u8], f: F) {
		self.state.for_keys_in_child_storage(storage_key, f)
	}

	fn for_child_keys_with_prefix<F: FnMut(&[u8])>(&self, storage_key: &[u8], prefix: &[u8], f: F) {
		self.state.for_child_keys_with_prefix(storage_key, prefix, f)
	}

	fn storage_root<I>(&self, delta: I) -> (H256, Self::Transaction)
		where
			I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>
	{
		self.state.storage_root(delta)
	}

	fn child_storage_root<I>(&self, storage_key: &[u8], delta: I) -> (Vec<u8>, bool, Self::Transaction)
		where
			I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>,
	{
		self.state.child_storage_root(storage_key, delta)
	}

	fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
		self.state.pairs()
	}

	fn keys(&self, prefix: &[u8]) -> Vec<Vec<u8>> {
		self.state.keys(prefix)
	}

	fn child_keys(&self, child_key: &[u8], prefix: &[u8]) -> Vec<Vec<u8>> {
		self.state.child_keys(child_key, prefix)
	}

	fn as_trie_backend(&mut self) -> Option<&state_machine::TrieBackend<Self::TrieBackendStorage, Blake2Hasher>> {
		self.state.as_trie_backend()
	}
}

Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
/// Database settings.
pub struct DatabaseSettings {
	/// State cache size.
	pub state_cache_size: usize,
	/// Ratio of cache size dedicated to child tries.
	pub state_cache_child_ratio: Option<(usize, usize)>,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	/// Pruning mode.
	pub pruning: PruningMode,
	/// Where to find the database.
	pub source: DatabaseSettingsSrc,
}

/// Where to find the database..
pub enum DatabaseSettingsSrc {
	/// Load a database from a given path. Recommended for most uses.
	Path {
		/// Path to the database.
		path: PathBuf,
		/// Cache size in bytes. If `None` default is used.
		cache_size: Option<usize>,
	},

	/// Use a custom already-open database.
	Custom(Arc<dyn KeyValueDB>),
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
}

/// Create an instance of db-backed client.
pub fn new_client<E, S, Block, RA>(
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	settings: DatabaseSettings,
	executor: E,
	genesis_storage: S,
	fork_blocks: ForkBlocks<Block>,
	execution_extensions: ExecutionExtensions<Block>,
) -> Result<(
		client::Client<
			Backend<Block>,
			client::LocalCallExecutor<Backend<Block>, E>,
			Block,
			RA,
		>,
		Arc<Backend<Block>>,
	),
	client::error::Error,
		E: CodeExecutor + RuntimeInfo,
	let backend = Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?);
	let executor = client::LocalCallExecutor::new(backend.clone(), executor);
		client::Client::new(backend.clone(), executor, genesis_storage, fork_blocks, execution_extensions)?,
pub(crate) mod columns {
	pub const META: Option<u32> = crate::utils::COLUMN_META;
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	pub const STATE: Option<u32> = Some(1);
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	pub const STATE_META: Option<u32> = Some(2);
	/// maps hashes to lookup keys and numbers to canon hashes.
	pub const KEY_LOOKUP: Option<u32> = Some(3);
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	pub const HEADER: Option<u32> = Some(4);
	pub const BODY: Option<u32> = Some(5);
	pub const JUSTIFICATION: Option<u32> = Some(6);
	pub const CHANGES_TRIE: Option<u32> = Some(7);
	pub const AUX: Option<u32> = Some(8);
	/// Offchain workers local storage
	pub const OFFCHAIN: Option<u32> = Some(9);
Gav Wood's avatar
Gav Wood committed
struct PendingBlock<Block: BlockT> {
	header: Block::Header,
	justification: Option<Justification>,
Gav Wood's avatar
Gav Wood committed
	body: Option<Vec<Block::Extrinsic>>,
	leaf_state: NewBlockState,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
// wrapper that implements trait required for state_db
struct StateMetaDb<'a>(&'a dyn KeyValueDB);
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed

impl<'a> state_db::MetaDb for StateMetaDb<'a> {
	type Error = io::Error;
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed

	fn get_meta(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
		self.0.get(columns::STATE_META, key).map(|r| r.map(|v| v.to_vec()))
	}
}

Gav Wood's avatar
Gav Wood committed
pub struct BlockchainDb<Block: BlockT> {
	db: Arc<dyn KeyValueDB>,
	meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
	leaves: RwLock<LeafSet<Block::Hash, NumberFor<Block>>>,
	header_metadata_cache: HeaderMetadataCache<Block>,
impl<Block: BlockT> BlockchainDb<Block> {
	fn new(db: Arc<dyn KeyValueDB>) -> ClientResult<Self> {
		let meta = read_meta::<Block>(&*db, columns::META, columns::HEADER)?;
		let leaves = LeafSet::read_from_db(&*db, columns::META, meta_keys::LEAF_PREFIX)?;
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		Ok(BlockchainDb {
			db,
			leaves: RwLock::new(leaves),
			meta: Arc::new(RwLock::new(meta)),
			header_metadata_cache: HeaderMetadataCache::default(),
	fn update_meta(
		&self,
		hash: Block::Hash,
		number: <Block::Header as HeaderT>::Number,
		is_best: bool,
		is_finalized: bool
	) {
		let mut meta = self.meta.write();
		if number.is_zero() {
			meta.genesis_hash = hash;
			meta.finalized_hash = hash;
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		if is_best {
			meta.best_number = number;
			meta.best_hash = hash;
		}

		if is_finalized {
			meta.finalized_number = number;
			meta.finalized_hash = hash;
		}
impl<Block: BlockT> client::blockchain::HeaderBackend<Block> for BlockchainDb<Block> {
	fn header(&self, id: BlockId<Block>) -> ClientResult<Option<Block::Header>> {
		utils::read_header(&*self.db, columns::KEY_LOOKUP, columns::HEADER, id)
	fn info(&self) -> client::blockchain::Info<Block> {
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		let meta = self.meta.read();
		client::blockchain::Info {
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
			best_hash: meta.best_hash,
			best_number: meta.best_number,
			genesis_hash: meta.genesis_hash,
			finalized_hash: meta.finalized_hash,
			finalized_number: meta.finalized_number,
	fn status(&self, id: BlockId<Block>) -> ClientResult<client::blockchain::BlockStatus> {
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		let exists = match id {
			BlockId::Hash(_) => read_db(
				&*self.db,
				columns::KEY_LOOKUP,
				columns::HEADER,
				id
			)?.is_some(),
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
			BlockId::Number(n) => n <= self.meta.read().best_number,
		};
		match exists {
			true => Ok(client::blockchain::BlockStatus::InChain),
			false => Ok(client::blockchain::BlockStatus::Unknown),
		}
	}

	fn number(&self, hash: Block::Hash) -> ClientResult<Option<NumberFor<Block>>> {
		Ok(self.header_metadata(hash).ok().map(|header_metadata| header_metadata.number))
	fn hash(&self, number: NumberFor<Block>) -> ClientResult<Option<Block::Hash>> {
		self.header(BlockId::Number(number)).and_then(|maybe_header| match maybe_header {
			Some(header) => Ok(Some(header.hash().clone())),
			None => Ok(None),
		})
impl<Block: BlockT> client::blockchain::Backend<Block> for BlockchainDb<Block> {
	fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
		match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
			Some(body) => match Decode::decode(&mut &body[..]) {
				Ok(body) => Ok(Some(body)),
				Err(err) => return Err(client::error::Error::Backend(
					format!("Error decoding body: {}", err)
				)),
	fn justification(&self, id: BlockId<Block>) -> ClientResult<Option<Justification>> {
		match read_db(&*self.db, columns::KEY_LOOKUP, columns::JUSTIFICATION, id)? {
			Some(justification) => match Decode::decode(&mut &justification[..]) {
				Ok(justification) => Ok(Some(justification)),
				Err(err) => return Err(client::error::Error::Backend(
					format!("Error decoding justification: {}", err)
				)),
	fn last_finalized(&self) -> ClientResult<Block::Hash> {
		Ok(self.meta.read().finalized_hash.clone())
	}

	fn cache(&self) -> Option<Arc<dyn client::blockchain::Cache<Block>>> {
	fn leaves(&self) -> ClientResult<Vec<Block::Hash>> {
		Ok(self.leaves.read().hashes())
	}
	fn children(&self, parent_hash: Block::Hash) -> ClientResult<Vec<Block::Hash>> {
		children::read_children(&*self.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash)
	}
impl<Block: BlockT> client::blockchain::ProvideCache<Block> for BlockchainDb<Block> {
	fn cache(&self) -> Option<Arc<dyn client::blockchain::Cache<Block>>> {
impl<Block: BlockT> HeaderMetadata<Block> for BlockchainDb<Block> {
	type Error = client::error::Error;

	fn header_metadata(&self, hash: Block::Hash) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
		self.header_metadata_cache.header_metadata(hash).or_else(|_| {
			self.header(BlockId::hash(hash))?.map(|header| {
				let header_metadata = CachedHeaderMetadata::from(&header);
				self.header_metadata_cache.insert_header_metadata(
					header_metadata.hash,
					header_metadata.clone(),
				);
				header_metadata
			}).ok_or(ClientError::UnknownBlock(format!("header not found in db: {}", hash)))
		})
	}

	fn insert_header_metadata(&self, hash: Block::Hash, metadata: CachedHeaderMetadata<Block>) {
		self.header_metadata_cache.insert_header_metadata(hash, metadata)
	}

	fn remove_header_metadata(&self, hash: Block::Hash) {
		self.header_metadata_cache.remove_header_metadata(hash);
	}
}

pub struct BlockImportOperation<Block: BlockT, H: Hasher> {
	old_state: CachingState<Blake2Hasher, RefTrackingState<Block>, Block>,
	db_updates: PrefixedMemoryDB<H>,
	storage_updates: StorageCollection,
	child_storage_updates: ChildStorageCollection,
	changes_trie_cache_update: Option<ChangesTrieCacheAction<H::Out, NumberFor<Block>>>,
Gav Wood's avatar
Gav Wood committed
	pending_block: Option<PendingBlock<Block>>,
	aux_ops: Vec<(Vec<u8>, Option<Vec<u8>>)>,
	finalized_blocks: Vec<(BlockId<Block>, Option<Justification>)>,
	set_head: Option<BlockId<Block>>,
}

impl<Block: BlockT, H: Hasher> BlockImportOperation<Block, H> {
	fn apply_aux(&mut self, transaction: &mut DBTransaction) {
		for (key, maybe_val) in self.aux_ops.drain(..) {
			match maybe_val {
				Some(val) => transaction.put_vec(columns::AUX, &key, val),
				None => transaction.delete(columns::AUX, &key),
			}
		}
	}
impl<Block> client_api::backend::BlockImportOperation<Block, Blake2Hasher>
	for BlockImportOperation<Block, Blake2Hasher> where Block: BlockT<Hash=H256>,
	type State = CachingState<Blake2Hasher, RefTrackingState<Block>, Block>;
	fn state(&self) -> ClientResult<Option<&Self::State>> {
		Ok(Some(&self.old_state))
	fn set_block_data(
		&mut self,
		header: Block::Header,
		body: Option<Vec<Block::Extrinsic>>,
		justification: Option<Justification>,
		leaf_state: NewBlockState,
	) -> ClientResult<()> {
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		assert!(self.pending_block.is_none(), "Only one block per operation is allowed");
		self.pending_block = Some(PendingBlock {
			header,
			body,
			justification,
	fn update_cache(&mut self, _cache: HashMap<well_known_cache_keys::Id, Vec<u8>>) {
		// Currently cache isn't implemented on full nodes.
	fn update_db_storage(&mut self, update: PrefixedMemoryDB<Blake2Hasher>) -> ClientResult<()> {
		self.db_updates = update;
	fn reset_storage(
		&mut self,
		top: StorageOverlay,
		children: ChildrenStorageOverlay
	) -> ClientResult<H256> {

		if top.iter().any(|(k, _)| well_known_keys::is_child_storage_key(k)) {
			return Err(client::error::Error::GenesisInvalid.into());
cheme's avatar
cheme committed
		for child_key in children.keys() {
			if !well_known_keys::is_child_storage_key(&child_key) {
				return Err(client::error::Error::GenesisInvalid.into());
cheme's avatar
cheme committed
		let child_delta = children.into_iter()
			.map(|(storage_key, child_overlay)|
				(storage_key, child_overlay.into_iter().map(|(k, v)| (k, Some(v)))));

		let (root, transaction) = self.old_state.full_storage_root(
			top.into_iter().map(|(k, v)| (k, Some(v))),
			child_delta
		);
		self.db_updates = transaction;
		self.commit_state = true;
	fn update_changes_trie(
		&mut self,
		update: ChangesTrieTransaction<Blake2Hasher, NumberFor<Block>>,
	) -> ClientResult<()> {
		self.changes_trie_updates = update.0;
		self.changes_trie_cache_update = Some(update.1);
	fn insert_aux<I>(&mut self, ops: I) -> ClientResult<()>
		where I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>
	{
		self.aux_ops.append(&mut ops.into_iter().collect());
	fn update_storage(
		&mut self,
		update: StorageCollection,
		child_update: ChildStorageCollection,
	) -> ClientResult<()> {
		self.storage_updates = update;
		self.child_storage_updates = child_update;
	fn mark_finalized(&mut self, block: BlockId<Block>, justification: Option<Justification>) -> ClientResult<()> {
		self.finalized_blocks.push((block, justification));
		Ok(())
	}
	fn mark_head(&mut self, block: BlockId<Block>) -> ClientResult<()> {
		assert!(self.set_head.is_none(), "Only one set head per operation is allowed");
		self.set_head = Some(block);
		Ok(())
	}
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
struct StorageDb<Block: BlockT> {
	pub db: Arc<dyn KeyValueDB>,
	pub state_db: StateDb<Block::Hash, Vec<u8>>,
impl<Block: BlockT> state_machine::Storage<Blake2Hasher> for StorageDb<Block> {
cheme's avatar
cheme committed
	fn get(&self, key: &H256, prefix: Prefix) -> Result<Option<DBValue>, String> {
		let key = prefixed_key::<Blake2Hasher>(key, prefix);
		self.state_db.get(&key, self).map(|r| r.map(|v| DBValue::from_slice(&v)))
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
			.map_err(|e| format!("Database backend error: {:?}", e))
	}
}

impl<Block: BlockT> state_db::NodeDb for StorageDb<Block> {
	type Error = io::Error;
	type Key = [u8];
	fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
		self.db.get(columns::STATE, key).map(|r| r.map(|v| v.to_vec()))
struct DbGenesisStorage(pub H256);

impl DbGenesisStorage {
	pub fn new() -> Self {
		let mut root = H256::default();
		let mut mdb = MemoryDB::<Blake2Hasher>::default();
		state_machine::TrieDBMut::<Blake2Hasher>::new(&mut mdb, &mut root);
		DbGenesisStorage(root)
	}
}

impl state_machine::Storage<Blake2Hasher> for DbGenesisStorage {
cheme's avatar
cheme committed
	fn get(&self, _key: &H256, _prefix: Prefix) -> Result<Option<DBValue>, String> {
/// A database wrapper for changes tries.
pub struct DbChangesTrieStorage<Block: BlockT> {
	db: Arc<dyn KeyValueDB>,
	meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
	cache: RwLock<ChangesTrieBuildCache<Block::Hash, NumberFor<Block>>>,
impl<Block: BlockT<Hash=H256>> DbChangesTrieStorage<Block> {
	/// Commit new changes trie.
	pub fn commit(&self, tx: &mut DBTransaction, mut changes_trie: MemoryDB<Blake2Hasher>) {
		for (key, (val, _)) in changes_trie.drain() {
			tx.put(columns::CHANGES_TRIE, &key[..], &val);
		}
	}

	/// Commit changes into changes trie build cache.
	pub fn commit_cache(&self, cache_update: ChangesTrieCacheAction<Block::Hash, NumberFor<Block>>) {
		self.cache.write().perform(cache_update);
	}

	/// Prune obsolete changes tries.
	pub fn prune(
		&self,
		config: &ChangesTrieConfiguration,
		tx: &mut DBTransaction,
		block_hash: Block::Hash,
		block_num: NumberFor<Block>,
	) {
		// never prune on archive nodes
		let min_blocks_to_keep = match self.min_blocks_to_keep {
			Some(min_blocks_to_keep) => min_blocks_to_keep,
			None => return,
		};

		state_machine::prune_changes_tries(
			&state_machine::ChangesTrieAnchorBlockId {
				hash: convert_hash(&block_hash),
			|node| tx.delete(columns::CHANGES_TRIE, node.as_ref()));
	}
}

impl<Block> client_api::backend::PrunableStateChangesTrieStorage<Block, Blake2Hasher>
	for DbChangesTrieStorage<Block>
where
	Block: BlockT<Hash=H256>,
{
	fn oldest_changes_trie_block(
		&self,
		config: &ChangesTrieConfiguration,
		best_finalized_block: NumberFor<Block>,
	) -> NumberFor<Block> {
		match self.min_blocks_to_keep {
			Some(min_blocks_to_keep) => state_machine::oldest_non_pruned_changes_trie(
				config,
impl<Block> state_machine::ChangesTrieRootsStorage<Blake2Hasher, NumberFor<Block>>
	for DbChangesTrieStorage<Block>
where
	Block: BlockT<Hash=H256>,
{
	fn build_anchor(
		&self,
		hash: H256,
	) -> Result<state_machine::ChangesTrieAnchorBlockId<H256, NumberFor<Block>>, String> {
		utils::read_header::<Block>(&*self.db, columns::KEY_LOOKUP, columns::HEADER, BlockId::Hash(hash))
			.map_err(|e| e.to_string())
			.and_then(|maybe_header| maybe_header.map(|header|
				state_machine::ChangesTrieAnchorBlockId {
					hash,
					number: *header.number(),
				}
			).ok_or_else(|| format!("Unknown header: {}", hash)))
	}

	fn root(
		&self,
		anchor: &state_machine::ChangesTrieAnchorBlockId<H256, NumberFor<Block>>,
		block: NumberFor<Block>,
	) -> Result<Option<H256>, String> {
		// check API requirement: we can't get NEXT block(s) based on anchor
		if block > anchor.number {
			return Err(format!("Can't get changes trie root at {} using anchor at {}", block, anchor.number));
		}

		// we need to get hash of the block to resolve changes trie root
		let block_id = if block <= self.meta.read().finalized_number {
			// if block is finalized, we could just read canonical hash
		} else {
			// the block is not finalized
			let mut current_num = anchor.number;
			let mut current_hash: Block::Hash = convert_hash(&anchor.hash);
			let maybe_anchor_header: Block::Header = utils::require_header::<Block>(
				&*self.db, columns::KEY_LOOKUP, columns::HEADER, BlockId::Number(current_num)
			).map_err(|e| e.to_string())?;
			if maybe_anchor_header.hash() == current_hash {
				// if anchor is canonicalized, then the block is also canonicalized
			} else {
				// else (block is not finalized + anchor is not canonicalized):
				// => we should find the required block hash by traversing
				// back from the anchor to the block with given number
				while current_num != block {
					let current_header: Block::Header = utils::require_header::<Block>(
						&*self.db, columns::KEY_LOOKUP, columns::HEADER, BlockId::Hash(current_hash)
					).map_err(|e| e.to_string())?;

					current_hash = *current_header.parent_hash();
		Ok(utils::require_header::<Block>(&*self.db, columns::KEY_LOOKUP, columns::HEADER, block_id)
			.map_err(|e| e.to_string())?
			.digest().log(DigestItem::as_changes_trie_root)
			.map(|root| H256::from_slice(root.as_ref())))
impl<Block> state_machine::ChangesTrieStorage<Blake2Hasher, NumberFor<Block>>
	for DbChangesTrieStorage<Block>
where
	Block: BlockT<Hash=H256>,
{
	fn as_roots_storage(&self) -> &dyn state_machine::ChangesTrieRootsStorage<Blake2Hasher, NumberFor<Block>> {
		self
	}

	fn with_cached_changed_keys(
		&self,
		root: &H256,
		functor: &mut dyn FnMut(&HashMap<Option<Vec<u8>>, HashSet<Vec<u8>>>),
	) -> bool {
		self.cache.read().with_changed_keys(root, functor)
	}

cheme's avatar
cheme committed
	fn get(&self, key: &H256, _prefix: Prefix) -> Result<Option<DBValue>, String> {
		self.db.get(columns::CHANGES_TRIE, &key[..])
			.map_err(|err| format!("{}", err))
	}
}
/// Disk backend. Keeps data in a key-value store. In archive mode, trie nodes are kept from all blocks.
/// Otherwise, trie nodes are kept only from some recent blocks.
Gav Wood's avatar
Gav Wood committed
pub struct Backend<Block: BlockT> {
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	storage: Arc<StorageDb<Block>>,
	offchain_storage: offchain::LocalStorage,
	changes_tries_storage: DbChangesTrieStorage<Block>,
	/// None<*> means that the value hasn't been cached yet. Some(*) means that the value (either None or
	/// Some(*)) has been cached and is valid.
	changes_trie_config: Mutex<Option<Option<ChangesTrieConfiguration>>>,
Gav Wood's avatar
Gav Wood committed
	blockchain: BlockchainDb<Block>,
	canonicalization_delay: u64,
	shared_cache: SharedCache<Block, Blake2Hasher>,
	import_lock: Mutex<()>,
impl<Block: BlockT<Hash=H256>> Backend<Block> {
	/// Create a new instance of database backend.
	///
	/// The pruning window is how old a block must be before the state is pruned.
	pub fn new(config: DatabaseSettings, canonicalization_delay: u64) -> ClientResult<Self> {
		let db = crate::utils::open_database(&config, columns::META, "full")?;
		Self::from_kvdb(db as Arc<_>, canonicalization_delay, &config)
	/// Create new memory-backed client backend for tests.
	#[cfg(any(test, feature = "test-helpers"))]
	pub fn new_test(keep_blocks: u32, canonicalization_delay: u64) -> Self {
		let db = Arc::new(kvdb_memorydb::create(crate::utils::NUM_COLUMNS));
		let db_setting = DatabaseSettings {
			state_cache_size: 16777216,
			state_cache_child_ratio: Some((50, 100)),
			pruning: PruningMode::keep_blocks(keep_blocks),
			source: DatabaseSettingsSrc::Custom(db),

		Self::new(db_setting, canonicalization_delay).expect("failed to create test-db")
	fn from_kvdb(
		db: Arc<dyn KeyValueDB>,
		canonicalization_delay: u64,
	) -> ClientResult<Self> {
		let is_archive_pruning = config.pruning.is_archive();
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		let blockchain = BlockchainDb::new(db.clone())?;
		let meta = blockchain.meta.clone();
		let map_e = |e: state_db::Error<io::Error>| ::client::error::Error::from(format!("State database error: {:?}", e));
		let state_db: StateDb<_, _> = StateDb::new(config.pruning.clone(), &StateMetaDb(&*db)).map_err(map_e)?;
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		let storage_db = StorageDb {
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
			state_db,
		};
		let offchain_storage = offchain::LocalStorage::new(db.clone());
		let changes_tries_storage = DbChangesTrieStorage {
			min_blocks_to_keep: if is_archive_pruning { None } else { Some(MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR) },
			cache: RwLock::new(ChangesTrieBuildCache::new()),
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed

		Ok(Backend {
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
			storage: Arc::new(storage_db),
			changes_tries_storage,
			changes_trie_config: Mutex::new(None),
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
			blockchain,
			shared_cache: new_shared_cache(
				config.state_cache_size,
				config.state_cache_child_ratio.unwrap_or(DEFAULT_CHILD_RATIO),
			),
			import_lock: Default::default(),
			is_archive: is_archive_pruning,
	/// Returns in-memory blockchain that contains the same set of blocks that the self.
	#[cfg(feature = "test-helpers")]
	pub fn as_in_memory(&self) -> InMemoryBackend<Block, Blake2Hasher> {
		use client_api::backend::{Backend as ClientBackend, BlockImportOperation};
		use client::blockchain::Backend as BlockchainBackend;

		let inmem = InMemoryBackend::<Block, Blake2Hasher>::new();

		// get all headers hashes && sort them by number (could be duplicate)
		let mut headers: Vec<(NumberFor<Block>, Block::Hash, Block::Header)> = Vec::new();
		for (_, header) in self.blockchain.db.iter(columns::HEADER) {
			let header = Block::Header::decode(&mut &header[..]).unwrap();
			let hash = header.hash();
			let number = *header.number();
			let pos = headers.binary_search_by(|item| item.0.cmp(&number));
			match pos {
				Ok(pos) => headers.insert(pos, (number, hash, header)),
				Err(pos) => headers.insert(pos, (number, hash, header)),
			}
		}

		// insert all other headers + bodies + justifications
		let info = self.blockchain.info();
		for (number, hash, header) in headers {
			let id = BlockId::Hash(hash);
			let justification = self.blockchain.justification(id).unwrap();
			let body = self.blockchain.body(id).unwrap();
			let state = self.state_at(id).unwrap().pairs();

			let new_block_state = if number.is_zero() {
				NewBlockState::Final
			} else if hash == info.best_hash {
				NewBlockState::Best
			} else {
				NewBlockState::Normal
			};
			let mut op = inmem.begin_operation().unwrap();
			op.set_block_data(header, body, justification, new_block_state).unwrap();
			op.update_db_storage(state.into_iter().map(|(k, v)| (None, k, Some(v))).collect()).unwrap();
			inmem.commit_operation(op).unwrap();
		}

		// and now finalize the best block we have
		inmem.finalize_block(BlockId::Hash(info.finalized_hash), None).unwrap();

		inmem
	}

	/// Returns total numbet of blocks (headers) in the block DB.
	#[cfg(feature = "test-helpers")]
	pub fn blocks_count(&self) -> u64 {
		self.blockchain.db.iter(columns::HEADER).count() as u64
	}

	/// Read (from storage or cache) changes trie config.
	///
	/// Currently changes tries configuration is set up once (at genesis) and could not
	/// be changed. Thus, we'll actually read value once and then just use cached value.
	fn changes_trie_config(&self, block: Block::Hash) -> ClientResult<Option<ChangesTrieConfiguration>> {
		let mut cached_changes_trie_config = self.changes_trie_config.lock();
		match cached_changes_trie_config.clone() {
			Some(cached_changes_trie_config) => Ok(cached_changes_trie_config),
			None => {
				use client_api::backend::Backend;
				let changes_trie_config = self
					.state_at(BlockId::Hash(block))?
					.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
					.and_then(|v| Decode::decode(&mut &*v).ok());
				*cached_changes_trie_config = Some(changes_trie_config.clone());
				Ok(changes_trie_config)
			},
		}
	}

	/// Handle setting head within a transaction. `route_to` should be the last
	/// block that existed in the database. `best_to` should be the best block
	/// to be set.
	///
	/// In the case where the new best block is a block to be imported, `route_to`
	/// should be the parent of `best_to`. In the case where we set an existing block
	/// to be best, `route_to` should equal to `best_to`.
	fn set_head_with_transaction(
		&self,
		transaction: &mut DBTransaction,
		route_to: Block::Hash,
		best_to: (NumberFor<Block>, Block::Hash),
	) -> ClientResult<(Vec<Block::Hash>, Vec<Block::Hash>)> {
		let mut enacted = Vec::default();
		let mut retracted = Vec::default();

		let meta = self.blockchain.meta.read();

		// cannot find tree route with empty DB.
		if meta.best_hash != Default::default() {
			let tree_route = header_metadata::tree_route(
				&self.blockchain,
				meta.best_hash,
				route_to,
			)?;

			// uncanonicalize: check safety violations and ensure the numbers no longer
			// point to these block hashes in the key mapping.
			for r in tree_route.retracted() {
				if r.hash == meta.finalized_hash {
					warn!(
						"Potential safety failure: reverting finalized block {:?}",
						(&r.number, &r.hash)
					);

					return Err(::client::error::Error::NotInFinalizedChain.into());
				}

				retracted.push(r.hash.clone());
				utils::remove_number_to_key_mapping(
					transaction,
					columns::KEY_LOOKUP,
					r.number
			}

			// canonicalize: set the number lookup to map to this block's hash.
			for e in tree_route.enacted() {
				enacted.push(e.hash.clone());
				utils::insert_number_to_key_mapping(
					transaction,
					columns::KEY_LOOKUP,
					e.number,
					e.hash
		let lookup_key = utils::number_and_hash_to_lookup_key(best_to.0, &best_to.1)?;
		transaction.put(columns::META, meta_keys::BEST_BLOCK, &lookup_key);
		utils::insert_number_to_key_mapping(
			transaction,
			columns::KEY_LOOKUP,
			best_to.0,
			best_to.1,

		Ok((enacted, retracted))
	}

	fn ensure_sequential_finalization(
		&self,
		header: &Block::Header,
		last_finalized: Option<Block::Hash>,