// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
//! Client backend that uses RocksDB database as storage.
//!
//! # Canonicality vs. Finality
//!
//! Finality indicates that a block will not be reverted, according to the consensus algorithm,
//! while canonicality indicates that the block may be reverted, but we will be unable to do so,
//! having discarded heavy state that will allow a chain reorganization.
//!
//! Finality implies canonicality but not vice-versa.
#![warn(missing_docs)]
pub mod light;
pub mod offchain;
mod children;
mod cache;
mod storage_cache;
mod utils;
use std::sync::Arc;
use std::path::PathBuf;
use std::io;
use std::collections::{HashMap, HashSet};
use client_api::backend::NewBlockState;
use client_api::blockchain::{well_known_cache_keys, HeaderBackend};
use client_api::{ForkBlocks, ExecutionStrategies};
use client_api::backend::{StorageCollection, ChildStorageCollection};
use client_api::error::{Result as ClientResult, Error as ClientError};
use codec::{Decode, Encode};
use hash_db::{Hasher, Prefix};
use kvdb::{KeyValueDB, DBTransaction};
use trie::{MemoryDB, PrefixedMemoryDB, prefixed_key};
use parking_lot::{Mutex, RwLock};
use primitives::{H256, Blake2Hasher, ChangesTrieConfiguration, convert_hash, traits::CodeExecutor};
use primitives::storage::well_known_keys;
use sr_primitives::{
generic::{BlockId, DigestItem}, Justification, StorageOverlay, ChildrenStorageOverlay,
BuildStorage,
};
use sr_primitives::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, One, SaturatedConversion
};
use executor::RuntimeInfo;
use state_machine::{
DBValue, ChangesTrieTransaction, ChangesTrieCacheAction, ChangesTrieBuildCache,
backend::Backend as StateBackend,
};
use crate::utils::{Meta, db_err, meta_keys, read_db, read_meta};
use client::leaves::{LeafSet, FinalizationDisplaced};
use state_db::StateDb;
use header_metadata::{CachedHeaderMetadata, HeaderMetadata, HeaderMetadataCache};
use crate::storage_cache::{CachingState, SharedCache, new_shared_cache};
use log::{trace, debug, warn};
pub use state_db::PruningMode;
#[cfg(feature = "test-helpers")]
use client::in_mem::Backend as InMemoryBackend;
const CANONICALIZATION_DELAY: u64 = 4096;
const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u32 = 32768;
/// Default value for storage cache child ratio.
const DEFAULT_CHILD_RATIO: (usize, usize) = (1, 10);
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
pub type DbState = state_machine::TrieBackend>, Blake2Hasher>;
/// Re-export the KVDB trait so that one can pass an implementation of it.
pub use kvdb;
/// A reference tracking state.
///
/// It makes sure that the hash we are using stays pinned in storage
/// until this structure is dropped.
pub struct RefTrackingState {
state: DbState,
storage: Arc>,
parent_hash: Option,
}
impl RefTrackingState {
fn new(state: DbState, storage: Arc>, parent_hash: Option) -> RefTrackingState {
RefTrackingState {
state,
parent_hash,
storage,
}
}
}
impl Drop for RefTrackingState {
fn drop(&mut self) {
if let Some(hash) = &self.parent_hash {
self.storage.state_db.unpin(hash);
}
}
}
impl std::fmt::Debug for RefTrackingState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Block {:?}", self.parent_hash)
}
}
impl StateBackend for RefTrackingState {
type Error = >::Error;
type Transaction = >::Transaction;
type TrieBackendStorage = >::TrieBackendStorage;
fn storage(&self, key: &[u8]) -> Result>, Self::Error> {
self.state.storage(key)
}
fn storage_hash(&self, key: &[u8]) -> Result , Self::Error> {
self.state.storage_hash(key)
}
fn child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result >, Self::Error> {
self.state.child_storage(storage_key, key)
}
fn exists_storage(&self, key: &[u8]) -> Result {
self.state.exists_storage(key)
}
fn exists_child_storage(&self, storage_key: &[u8], key: &[u8]) -> Result {
self.state.exists_child_storage(storage_key, key)
}
fn for_keys_with_prefix(&self, prefix: &[u8], f: F) {
self.state.for_keys_with_prefix(prefix, f)
}
fn for_key_values_with_prefix(&self, prefix: &[u8], f: F) {
self.state.for_key_values_with_prefix(prefix, f)
}
fn for_keys_in_child_storage(&self, storage_key: &[u8], f: F) {
self.state.for_keys_in_child_storage(storage_key, f)
}
fn for_child_keys_with_prefix(&self, storage_key: &[u8], prefix: &[u8], f: F) {
self.state.for_child_keys_with_prefix(storage_key, prefix, f)
}
fn storage_root(&self, delta: I) -> (H256, Self::Transaction)
where
I: IntoIterator- , Option
>)>
{
self.state.storage_root(delta)
}
fn child_storage_root(&self, storage_key: &[u8], delta: I) -> (Vec, bool, Self::Transaction)
where
I: IntoIterator- , Option
>)>,
{
self.state.child_storage_root(storage_key, delta)
}
fn pairs(&self) -> Vec<(Vec, Vec)> {
self.state.pairs()
}
fn keys(&self, prefix: &[u8]) -> Vec> {
self.state.keys(prefix)
}
fn child_keys(&self, child_key: &[u8], prefix: &[u8]) -> Vec> {
self.state.child_keys(child_key, prefix)
}
fn as_trie_backend(&mut self) -> Option<&state_machine::TrieBackend> {
self.state.as_trie_backend()
}
}
/// Database settings.
pub struct DatabaseSettings {
/// State cache size.
pub state_cache_size: usize,
/// Ratio of cache size dedicated to child tries.
pub state_cache_child_ratio: Option<(usize, usize)>,
/// Pruning mode.
pub pruning: PruningMode,
/// Where to find the database.
pub source: DatabaseSettingsSrc,
}
/// Where to find the database..
pub enum DatabaseSettingsSrc {
/// Load a database from a given path. Recommended for most uses.
Path {
/// Path to the database.
path: PathBuf,
/// Cache size in bytes. If `None` default is used.
cache_size: Option,
},
/// Use a custom already-open database.
Custom(Arc),
}
/// Create an instance of db-backed client.
pub fn new_client(
settings: DatabaseSettings,
executor: E,
genesis_storage: S,
fork_blocks: ForkBlocks,
execution_strategies: ExecutionStrategies,
keystore: Option,
) -> Result<(
client::Client<
Backend,
client::LocalCallExecutor, E>,
Block,
RA,
>,
Arc>,
),
client::error::Error,
>
where
Block: BlockT,
E: CodeExecutor + RuntimeInfo,
S: BuildStorage,
{
let backend = Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?);
let executor = client::LocalCallExecutor::new(backend.clone(), executor, keystore);
Ok((
client::Client::new(backend.clone(), executor, genesis_storage, fork_blocks, execution_strategies)?,
backend,
))
}
pub(crate) mod columns {
pub const META: Option = crate::utils::COLUMN_META;
pub const STATE: Option = Some(1);
pub const STATE_META: Option = Some(2);
/// maps hashes to lookup keys and numbers to canon hashes.
pub const KEY_LOOKUP: Option = Some(3);
pub const HEADER: Option = Some(4);
pub const BODY: Option = Some(5);
pub const JUSTIFICATION: Option = Some(6);
pub const CHANGES_TRIE: Option = Some(7);
pub const AUX: Option = Some(8);
/// Offchain workers local storage
pub const OFFCHAIN: Option = Some(9);
}
struct PendingBlock {
header: Block::Header,
justification: Option,
body: Option>,
leaf_state: NewBlockState,
}
// wrapper that implements trait required for state_db
struct StateMetaDb<'a>(&'a dyn KeyValueDB);
impl<'a> state_db::MetaDb for StateMetaDb<'a> {
type Error = io::Error;
fn get_meta(&self, key: &[u8]) -> Result>, Self::Error> {
self.0.get(columns::STATE_META, key).map(|r| r.map(|v| v.to_vec()))
}
}
/// Block database
pub struct BlockchainDb {
db: Arc,
meta: Arc, Block::Hash>>>,
leaves: RwLock>>,
header_metadata_cache: HeaderMetadataCache,
}
impl BlockchainDb {
fn new(db: Arc) -> ClientResult {
let meta = read_meta::(&*db, columns::META, columns::HEADER)?;
let leaves = LeafSet::read_from_db(&*db, columns::META, meta_keys::LEAF_PREFIX)?;
Ok(BlockchainDb {
db,
leaves: RwLock::new(leaves),
meta: Arc::new(RwLock::new(meta)),
header_metadata_cache: HeaderMetadataCache::default(),
})
}
fn update_meta(
&self,
hash: Block::Hash,
number: ::Number,
is_best: bool,
is_finalized: bool
) {
let mut meta = self.meta.write();
if number.is_zero() {
meta.genesis_hash = hash;
meta.finalized_hash = hash;
}
if is_best {
meta.best_number = number;
meta.best_hash = hash;
}
if is_finalized {
meta.finalized_number = number;
meta.finalized_hash = hash;
}
}
}
impl client::blockchain::HeaderBackend for BlockchainDb {
fn header(&self, id: BlockId) -> ClientResult> {
utils::read_header(&*self.db, columns::KEY_LOOKUP, columns::HEADER, id)
}
fn info(&self) -> client::blockchain::Info {
let meta = self.meta.read();
client::blockchain::Info {
best_hash: meta.best_hash,
best_number: meta.best_number,
genesis_hash: meta.genesis_hash,
finalized_hash: meta.finalized_hash,
finalized_number: meta.finalized_number,
}
}
fn status(&self, id: BlockId) -> ClientResult {
let exists = match id {
BlockId::Hash(_) => read_db(
&*self.db,
columns::KEY_LOOKUP,
columns::HEADER,
id
)?.is_some(),
BlockId::Number(n) => n <= self.meta.read().best_number,
};
match exists {
true => Ok(client::blockchain::BlockStatus::InChain),
false => Ok(client::blockchain::BlockStatus::Unknown),
}
}
fn number(&self, hash: Block::Hash) -> ClientResult>> {
Ok(self.header_metadata(hash).ok().map(|header_metadata| header_metadata.number))
}
fn hash(&self, number: NumberFor) -> ClientResult> {
self.header(BlockId::Number(number)).and_then(|maybe_header| match maybe_header {
Some(header) => Ok(Some(header.hash().clone())),
None => Ok(None),
})
}
}
impl client::blockchain::Backend for BlockchainDb {
fn body(&self, id: BlockId) -> ClientResult>> {
match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
Some(body) => match Decode::decode(&mut &body[..]) {
Ok(body) => Ok(Some(body)),
Err(err) => return Err(client::error::Error::Backend(
format!("Error decoding body: {}", err)
)),
}
None => Ok(None),
}
}
fn justification(&self, id: BlockId) -> ClientResult> {
match read_db(&*self.db, columns::KEY_LOOKUP, columns::JUSTIFICATION, id)? {
Some(justification) => match Decode::decode(&mut &justification[..]) {
Ok(justification) => Ok(Some(justification)),
Err(err) => return Err(client::error::Error::Backend(
format!("Error decoding justification: {}", err)
)),
}
None => Ok(None),
}
}
fn last_finalized(&self) -> ClientResult {
Ok(self.meta.read().finalized_hash.clone())
}
fn cache(&self) -> Option>> {
None
}
fn leaves(&self) -> ClientResult> {
Ok(self.leaves.read().hashes())
}
fn children(&self, parent_hash: Block::Hash) -> ClientResult> {
children::read_children(&*self.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash)
}
}
impl client::blockchain::ProvideCache for BlockchainDb {
fn cache(&self) -> Option>> {
None
}
}
impl HeaderMetadata for BlockchainDb {
type Error = client::error::Error;
fn header_metadata(&self, hash: Block::Hash) -> Result, Self::Error> {
self.header_metadata_cache.header_metadata(hash).or_else(|_| {
self.header(BlockId::hash(hash))?.map(|header| {
let header_metadata = CachedHeaderMetadata::from(&header);
self.header_metadata_cache.insert_header_metadata(
header_metadata.hash,
header_metadata.clone(),
);
header_metadata
}).ok_or(ClientError::UnknownBlock(format!("header not found in db: {}", hash)))
})
}
fn insert_header_metadata(&self, hash: Block::Hash, metadata: CachedHeaderMetadata) {
self.header_metadata_cache.insert_header_metadata(hash, metadata)
}
fn remove_header_metadata(&self, hash: Block::Hash) {
self.header_metadata_cache.remove_header_metadata(hash);
}
}
/// Database transaction
pub struct BlockImportOperation {
old_state: CachingState, Block>,
db_updates: PrefixedMemoryDB,
storage_updates: StorageCollection,
child_storage_updates: ChildStorageCollection,
changes_trie_updates: MemoryDB,
changes_trie_cache_update: Option>>,
pending_block: Option>,
aux_ops: Vec<(Vec, Option>)>,
finalized_blocks: Vec<(BlockId, Option)>,
set_head: Option>,
commit_state: bool,
}
impl BlockImportOperation {
fn apply_aux(&mut self, transaction: &mut DBTransaction) {
for (key, maybe_val) in self.aux_ops.drain(..) {
match maybe_val {
Some(val) => transaction.put_vec(columns::AUX, &key, val),
None => transaction.delete(columns::AUX, &key),
}
}
}
}
impl client_api::backend::BlockImportOperation
for BlockImportOperation where Block: BlockT,
{
type State = CachingState, Block>;
fn state(&self) -> ClientResult> {
Ok(Some(&self.old_state))
}
fn set_block_data(
&mut self,
header: Block::Header,
body: Option>,
justification: Option,
leaf_state: NewBlockState,
) -> ClientResult<()> {
assert!(self.pending_block.is_none(), "Only one block per operation is allowed");
self.pending_block = Some(PendingBlock {
header,
body,
justification,
leaf_state,
});
Ok(())
}
fn update_cache(&mut self, _cache: HashMap>) {
// Currently cache isn't implemented on full nodes.
}
fn update_db_storage(&mut self, update: PrefixedMemoryDB) -> ClientResult<()> {
self.db_updates = update;
Ok(())
}
fn reset_storage(
&mut self,
top: StorageOverlay,
children: ChildrenStorageOverlay
) -> ClientResult {
if top.iter().any(|(k, _)| well_known_keys::is_child_storage_key(k)) {
return Err(client::error::Error::GenesisInvalid.into());
}
for child_key in children.keys() {
if !well_known_keys::is_child_storage_key(&child_key) {
return Err(client::error::Error::GenesisInvalid.into());
}
}
let child_delta = children.into_iter()
.map(|(storage_key, child_overlay)|
(storage_key, child_overlay.into_iter().map(|(k, v)| (k, Some(v)))));
let (root, transaction) = self.old_state.full_storage_root(
top.into_iter().map(|(k, v)| (k, Some(v))),
child_delta
);
self.db_updates = transaction;
self.commit_state = true;
Ok(root)
}
fn update_changes_trie(
&mut self,
update: ChangesTrieTransaction>,
) -> ClientResult<()> {
self.changes_trie_updates = update.0;
self.changes_trie_cache_update = Some(update.1);
Ok(())
}
fn insert_aux(&mut self, ops: I) -> ClientResult<()>
where I: IntoIterator- , Option
>)>
{
self.aux_ops.append(&mut ops.into_iter().collect());
Ok(())
}
fn update_storage(
&mut self,
update: StorageCollection,
child_update: ChildStorageCollection,
) -> ClientResult<()> {
self.storage_updates = update;
self.child_storage_updates = child_update;
Ok(())
}
fn mark_finalized(&mut self, block: BlockId, justification: Option) -> ClientResult<()> {
self.finalized_blocks.push((block, justification));
Ok(())
}
fn mark_head(&mut self, block: BlockId) -> ClientResult<()> {
assert!(self.set_head.is_none(), "Only one set head per operation is allowed");
self.set_head = Some(block);
Ok(())
}
}
struct StorageDb {
pub db: Arc,
pub state_db: StateDb>,
}
impl state_machine::Storage for StorageDb {
fn get(&self, key: &H256, prefix: Prefix) -> Result, String> {
let key = prefixed_key::(key, prefix);
self.state_db.get(&key, self).map(|r| r.map(|v| DBValue::from_slice(&v)))
.map_err(|e| format!("Database backend error: {:?}", e))
}
}
impl state_db::NodeDb for StorageDb {
type Error = io::Error;
type Key = [u8];
fn get(&self, key: &[u8]) -> Result>, Self::Error> {
self.db.get(columns::STATE, key).map(|r| r.map(|v| v.to_vec()))
}
}
struct DbGenesisStorage(pub H256);
impl DbGenesisStorage {
pub fn new() -> Self {
let mut root = H256::default();
let mut mdb = MemoryDB::::default();
state_machine::TrieDBMut::::new(&mut mdb, &mut root);
DbGenesisStorage(root)
}
}
impl state_machine::Storage for DbGenesisStorage {
fn get(&self, _key: &H256, _prefix: Prefix) -> Result, String> {
Ok(None)
}
}
/// A database wrapper for changes tries.
pub struct DbChangesTrieStorage {
db: Arc,
meta: Arc, Block::Hash>>>,
min_blocks_to_keep: Option,
cache: RwLock>>,
_phantom: ::std::marker::PhantomData,
}
impl> DbChangesTrieStorage {
/// Commit new changes trie.
pub fn commit(&self, tx: &mut DBTransaction, mut changes_trie: MemoryDB) {
for (key, (val, _)) in changes_trie.drain() {
tx.put(columns::CHANGES_TRIE, &key[..], &val);
}
}
/// Commit changes into changes trie build cache.
pub fn commit_cache(&self, cache_update: ChangesTrieCacheAction>) {
self.cache.write().perform(cache_update);
}
/// Prune obsolete changes tries.
pub fn prune(
&self,
config: &ChangesTrieConfiguration,
tx: &mut DBTransaction,
block_hash: Block::Hash,
block_num: NumberFor,
) {
// never prune on archive nodes
let min_blocks_to_keep = match self.min_blocks_to_keep {
Some(min_blocks_to_keep) => min_blocks_to_keep,
None => return,
};
state_machine::prune_changes_tries(
config,
&*self,
min_blocks_to_keep.into(),
&state_machine::ChangesTrieAnchorBlockId {
hash: convert_hash(&block_hash),
number: block_num,
},
|node| tx.delete(columns::CHANGES_TRIE, node.as_ref()));
}
}
impl client_api::backend::PrunableStateChangesTrieStorage
for DbChangesTrieStorage
where
Block: BlockT,
{
fn oldest_changes_trie_block(
&self,
config: &ChangesTrieConfiguration,
best_finalized_block: NumberFor,
) -> NumberFor {
match self.min_blocks_to_keep {
Some(min_blocks_to_keep) => state_machine::oldest_non_pruned_changes_trie(
config,
min_blocks_to_keep.into(),
best_finalized_block,
),
None => One::one(),
}
}
}
impl state_machine::ChangesTrieRootsStorage>
for DbChangesTrieStorage
where
Block: BlockT,
{
fn build_anchor(
&self,
hash: H256,
) -> Result>, String> {
utils::read_header::(&*self.db, columns::KEY_LOOKUP, columns::HEADER, BlockId::Hash(hash))
.map_err(|e| e.to_string())
.and_then(|maybe_header| maybe_header.map(|header|
state_machine::ChangesTrieAnchorBlockId {
hash,
number: *header.number(),
}
).ok_or_else(|| format!("Unknown header: {}", hash)))
}
fn root(
&self,
anchor: &state_machine::ChangesTrieAnchorBlockId>,
block: NumberFor,
) -> Result, String> {
// check API requirement: we can't get NEXT block(s) based on anchor
if block > anchor.number {
return Err(format!("Can't get changes trie root at {} using anchor at {}", block, anchor.number));
}
// we need to get hash of the block to resolve changes trie root
let block_id = if block <= self.meta.read().finalized_number {
// if block is finalized, we could just read canonical hash
BlockId::Number(block)
} else {
// the block is not finalized
let mut current_num = anchor.number;
let mut current_hash: Block::Hash = convert_hash(&anchor.hash);
let maybe_anchor_header: Block::Header = utils::require_header::(
&*self.db, columns::KEY_LOOKUP, columns::HEADER, BlockId::Number(current_num)
).map_err(|e| e.to_string())?;
if maybe_anchor_header.hash() == current_hash {
// if anchor is canonicalized, then the block is also canonicalized
BlockId::Number(block)
} else {
// else (block is not finalized + anchor is not canonicalized):
// => we should find the required block hash by traversing
// back from the anchor to the block with given number
while current_num != block {
let current_header: Block::Header = utils::require_header::(
&*self.db, columns::KEY_LOOKUP, columns::HEADER, BlockId::Hash(current_hash)
).map_err(|e| e.to_string())?;
current_hash = *current_header.parent_hash();
current_num = current_num - One::one();
}
BlockId::Hash(current_hash)
}
};
Ok(utils::require_header::(&*self.db, columns::KEY_LOOKUP, columns::HEADER, block_id)
.map_err(|e| e.to_string())?
.digest().log(DigestItem::as_changes_trie_root)
.map(|root| H256::from_slice(root.as_ref())))
}
}
impl state_machine::ChangesTrieStorage>
for DbChangesTrieStorage
where
Block: BlockT,
{
fn as_roots_storage(&self) -> &dyn state_machine::ChangesTrieRootsStorage> {
self
}
fn with_cached_changed_keys(
&self,
root: &H256,
functor: &mut dyn FnMut(&HashMap>, HashSet>>),
) -> bool {
self.cache.read().with_changed_keys(root, functor)
}
fn get(&self, key: &H256, _prefix: Prefix) -> Result, String> {
self.db.get(columns::CHANGES_TRIE, &key[..])
.map_err(|err| format!("{}", err))
}
}
/// Disk backend. Keeps data in a key-value store. In archive mode, trie nodes are kept from all blocks.
/// Otherwise, trie nodes are kept only from some recent blocks.
pub struct Backend {
storage: Arc>,
offchain_storage: offchain::LocalStorage,
changes_tries_storage: DbChangesTrieStorage,
/// None<*> means that the value hasn't been cached yet. Some(*) means that the value (either None or
/// Some(*)) has been cached and is valid.
changes_trie_config: Mutex>>,
blockchain: BlockchainDb,
canonicalization_delay: u64,
shared_cache: SharedCache,
import_lock: Mutex<()>,
is_archive: bool,
}
impl> Backend {
/// Create a new instance of database backend.
///
/// The pruning window is how old a block must be before the state is pruned.
pub fn new(config: DatabaseSettings, canonicalization_delay: u64) -> ClientResult {
let db = crate::utils::open_database(&config, columns::META, "full")?;
Self::from_kvdb(db as Arc<_>, canonicalization_delay, &config)
}
/// Create new memory-backed client backend for tests.
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_test(keep_blocks: u32, canonicalization_delay: u64) -> Self {
let db = Arc::new(kvdb_memorydb::create(crate::utils::NUM_COLUMNS));
let db_setting = DatabaseSettings {
state_cache_size: 16777216,
state_cache_child_ratio: Some((50, 100)),
pruning: PruningMode::keep_blocks(keep_blocks),
source: DatabaseSettingsSrc::Custom(db),
};
Self::new(db_setting, canonicalization_delay).expect("failed to create test-db")
}
fn from_kvdb(
db: Arc,
canonicalization_delay: u64,
config: &DatabaseSettings
) -> ClientResult {
let is_archive_pruning = config.pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone())?;
let meta = blockchain.meta.clone();
let map_e = |e: state_db::Error| ::client::error::Error::from(format!("State database error: {:?}", e));
let state_db: StateDb<_, _> = StateDb::new(config.pruning.clone(), &StateMetaDb(&*db)).map_err(map_e)?;
let storage_db = StorageDb {
db: db.clone(),
state_db,
};
let offchain_storage = offchain::LocalStorage::new(db.clone());
let changes_tries_storage = DbChangesTrieStorage {
db,
meta,
min_blocks_to_keep: if is_archive_pruning { None } else { Some(MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR) },
cache: RwLock::new(ChangesTrieBuildCache::new()),
_phantom: Default::default(),
};
Ok(Backend {
storage: Arc::new(storage_db),
offchain_storage,
changes_tries_storage,
changes_trie_config: Mutex::new(None),
blockchain,
canonicalization_delay,
shared_cache: new_shared_cache(
config.state_cache_size,
config.state_cache_child_ratio.unwrap_or(DEFAULT_CHILD_RATIO),
),
import_lock: Default::default(),
is_archive: is_archive_pruning,
})
}
/// Returns in-memory blockchain that contains the same set of blocks that the self.
#[cfg(feature = "test-helpers")]
pub fn as_in_memory(&self) -> InMemoryBackend {
use client_api::backend::{Backend as ClientBackend, BlockImportOperation};
use client::blockchain::Backend as BlockchainBackend;
let inmem = InMemoryBackend::::new();
// get all headers hashes && sort them by number (could be duplicate)
let mut headers: Vec<(NumberFor, Block::Hash, Block::Header)> = Vec::new();
for (_, header) in self.blockchain.db.iter(columns::HEADER) {
let header = Block::Header::decode(&mut &header[..]).unwrap();
let hash = header.hash();
let number = *header.number();
let pos = headers.binary_search_by(|item| item.0.cmp(&number));
match pos {
Ok(pos) => headers.insert(pos, (number, hash, header)),
Err(pos) => headers.insert(pos, (number, hash, header)),
}
}
// insert all other headers + bodies + justifications
let info = self.blockchain.info();
for (number, hash, header) in headers {
let id = BlockId::Hash(hash);
let justification = self.blockchain.justification(id).unwrap();
let body = self.blockchain.body(id).unwrap();
let state = self.state_at(id).unwrap().pairs();
let new_block_state = if number.is_zero() {
NewBlockState::Final
} else if hash == info.best_hash {
NewBlockState::Best
} else {
NewBlockState::Normal
};
let mut op = inmem.begin_operation().unwrap();
op.set_block_data(header, body, justification, new_block_state).unwrap();
op.update_db_storage(state.into_iter().map(|(k, v)| (None, k, Some(v))).collect()).unwrap();
inmem.commit_operation(op).unwrap();
}
// and now finalize the best block we have
inmem.finalize_block(BlockId::Hash(info.finalized_hash), None).unwrap();
inmem
}
/// Returns total numbet of blocks (headers) in the block DB.
#[cfg(feature = "test-helpers")]
pub fn blocks_count(&self) -> u64 {
self.blockchain.db.iter(columns::HEADER).count() as u64
}
/// Read (from storage or cache) changes trie config.
///
/// Currently changes tries configuration is set up once (at genesis) and could not
/// be changed. Thus, we'll actually read value once and then just use cached value.
fn changes_trie_config(&self, block: Block::Hash) -> ClientResult> {
let mut cached_changes_trie_config = self.changes_trie_config.lock();
match cached_changes_trie_config.clone() {
Some(cached_changes_trie_config) => Ok(cached_changes_trie_config),
None => {
use client_api::backend::Backend;
let changes_trie_config = self
.state_at(BlockId::Hash(block))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)?
.and_then(|v| Decode::decode(&mut &*v).ok());
*cached_changes_trie_config = Some(changes_trie_config.clone());
Ok(changes_trie_config)
},
}
}
/// Handle setting head within a transaction. `route_to` should be the last
/// block that existed in the database. `best_to` should be the best block
/// to be set.
///
/// In the case where the new best block is a block to be imported, `route_to`
/// should be the parent of `best_to`. In the case where we set an existing block
/// to be best, `route_to` should equal to `best_to`.
fn set_head_with_transaction(
&self,
transaction: &mut DBTransaction,
route_to: Block::Hash,
best_to: (NumberFor, Block::Hash),
) -> ClientResult<(Vec, Vec)> {
let mut enacted = Vec::default();
let mut retracted = Vec::default();
let meta = self.blockchain.meta.read();
// cannot find tree route with empty DB.
if meta.best_hash != Default::default() {
let tree_route = header_metadata::tree_route(
&self.blockchain,
meta.best_hash,
route_to,
)?;
// uncanonicalize: check safety violations and ensure the numbers no longer
// point to these block hashes in the key mapping.
for r in tree_route.retracted() {
if r.hash == meta.finalized_hash {
warn!(
"Potential safety failure: reverting finalized block {:?}",
(&r.number, &r.hash)
);
return Err(::client::error::Error::NotInFinalizedChain.into());
}
retracted.push(r.hash.clone());
utils::remove_number_to_key_mapping(
transaction,
columns::KEY_LOOKUP,
r.number
)?;
}
// canonicalize: set the number lookup to map to this block's hash.
for e in tree_route.enacted() {
enacted.push(e.hash.clone());
utils::insert_number_to_key_mapping(
transaction,
columns::KEY_LOOKUP,
e.number,
e.hash
)?;
}
}
let lookup_key = utils::number_and_hash_to_lookup_key(best_to.0, &best_to.1)?;
transaction.put(columns::META, meta_keys::BEST_BLOCK, &lookup_key);
utils::insert_number_to_key_mapping(
transaction,
columns::KEY_LOOKUP,
best_to.0,
best_to.1,
)?;
Ok((enacted, retracted))
}
fn ensure_sequential_finalization(
&self,
header: &Block::Header,
last_finalized: Option,
) -> ClientResult<()> {
let last_finalized = last_finalized.unwrap_or_else(|| self.blockchain.meta.read().finalized_hash);
if *header.parent_hash() != last_finalized {
return Err(::client::error::Error::NonSequentialFinalization(
format!("Last finalized {:?} not parent of {:?}", last_finalized, header.hash()),
).into());
}
Ok(())
}
fn finalize_block_with_transaction(
&self,
transaction: &mut DBTransaction,
hash: &Block::Hash,
header: &Block::Header,
last_finalized: Option,
justification: Option,
finalization_displaced: &mut Option>>,
) -> ClientResult<(Block::Hash, ::Number, bool, bool)> {
// TODO: ensure best chain contains this block.
let number = *header.number();
self.ensure_sequential_finalization(header, last_finalized)?;
self.note_finalized(
transaction,
header,
*hash,
finalization_displaced,
)?;
if let Some(justification) = justification {
transaction.put(
columns::JUSTIFICATION,
&utils::number_and_hash_to_lookup_key(number, hash)?,
&justification.encode(),
);
}
Ok((*hash, number, false, true))
}
// performs forced canonicaliziation with a delay after importing a non-finalized block.
fn force_delayed_canonicalize(
&self,
transaction: &mut DBTransaction,
hash: Block::Hash,
number: NumberFor,
)
-> ClientResult<()>
{
let number_u64 = number.saturated_into::();
if number_u64 > self.canonicalization_delay {
let new_canonical = number_u64 - self.canonicalization_delay;
if new_canonical <= self.storage.state_db.best_canonical().unwrap_or(0) {
return Ok(())
}
let hash = if new_canonical == number_u64 {
hash
} else {
::client::blockchain::HeaderBackend::hash(&self.blockchain, new_canonical.saturated_into())?
.expect("existence of block with number `new_canonical` \
implies existence of blocks with all numbers before it; qed")
};
trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash);
let commit = self.storage.state_db.canonicalize_block(&hash)
.map_err(|e: state_db::Error| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(transaction, commit);
};
Ok(())
}
fn try_commit_operation(&self, mut operation: BlockImportOperation)
-> ClientResult<()>
{
let mut transaction = DBTransaction::new();
let mut finalization_displaced_leaves = None;
operation.apply_aux(&mut transaction);
let mut meta_updates = Vec::with_capacity(operation.finalized_blocks.len());
let mut last_finalized_hash = self.blockchain.meta.read().finalized_hash;
for (block, justification) in operation.finalized_blocks {
let block_hash = self.blockchain.expect_block_hash_from_id(&block)?;
let block_header = self.blockchain.expect_header(BlockId::Hash(block_hash))?;
meta_updates.push(self.finalize_block_with_transaction(
&mut transaction,
&block_hash,
&block_header,
Some(last_finalized_hash),
justification,
&mut finalization_displaced_leaves,
)?);
last_finalized_hash = block_hash;
}
let imported = if let Some(pending_block) = operation.pending_block {
let hash = pending_block.header.hash();
let parent_hash = *pending_block.header.parent_hash();
let number = pending_block.header.number().clone();
// blocks are keyed by number + hash.
let lookup_key = utils::number_and_hash_to_lookup_key(number, hash)?;
let (enacted, retracted) = if pending_block.leaf_state.is_best() {
self.set_head_with_transaction(&mut transaction, parent_hash, (number, hash))?
} else {
(Default::default(), Default::default())
};
utils::insert_hash_to_key_mapping(
&mut transaction,
columns::KEY_LOOKUP,
number,
hash,
)?;
let header_metadata = CachedHeaderMetadata::from(&pending_block.header);
self.blockchain.insert_header_metadata(
header_metadata.hash,
header_metadata,
);
transaction.put(columns::HEADER, &lookup_key, &pending_block.header.encode());
if let Some(body) = &pending_block.body {
transaction.put(columns::BODY, &lookup_key, &body.encode());
}
if let Some(justification) = pending_block.justification {
transaction.put(columns::JUSTIFICATION, &lookup_key, &justification.encode());
}
if number.is_zero() {
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);
transaction.put(columns::META, meta_keys::GENESIS_HASH, hash.as_ref());
}
let finalized = if operation.commit_state {
let mut changeset: state_db::ChangeSet> = state_db::ChangeSet::default();
for (key, (val, rc)) in operation.db_updates.drain() {
if rc > 0 {
changeset.inserted.push((key, val.to_vec()));
} else if rc < 0 {
changeset.deleted.push(key);
}
}
let number_u64 = number.saturated_into::();
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset)
.map_err(|e: state_db::Error| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(&mut transaction, commit);
// Check if need to finalize. Genesis is always finalized instantly.
let finalized = number_u64 == 0 || pending_block.leaf_state.is_final();
finalized
} else {
false
};
let header = &pending_block.header;
let is_best = pending_block.leaf_state.is_best();
let changes_trie_updates = operation.changes_trie_updates;
self.changes_tries_storage.commit(&mut transaction, changes_trie_updates);
let cache = operation.old_state.release(); // release state reference so that it can be finalized
if finalized {
// TODO: ensure best chain contains this block.
self.ensure_sequential_finalization(header, Some(last_finalized_hash))?;
self.note_finalized(
&mut transaction,
header,
hash,
&mut finalization_displaced_leaves,
)?;
} else {
// canonicalize blocks which are old enough, regardless of finality.
self.force_delayed_canonicalize(&mut transaction, hash, *header.number())?
}
debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, is_best);
let displaced_leaf = {
let mut leaves = self.blockchain.leaves.write();
let displaced_leaf = leaves.import(hash, number, parent_hash);
leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX);
displaced_leaf
};
let mut children = children::read_children(&*self.storage.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash)?;
children.push(hash);
children::write_children(&mut transaction, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash, children);
meta_updates.push((hash, number, pending_block.leaf_state.is_best(), finalized));
Some((number, hash, enacted, retracted, displaced_leaf, is_best, cache))
} else {
None
};
let cache_update = if let Some(set_head) = operation.set_head {
if let Some(header) = ::client::blockchain::HeaderBackend::header(&self.blockchain, set_head)? {
let number = header.number();
let hash = header.hash();
let (enacted, retracted) = self.set_head_with_transaction(
&mut transaction,
hash.clone(),
(number.clone(), hash.clone())
)?;
meta_updates.push((hash, *number, true, false));
Some((enacted, retracted))
} else {
return Err(client::error::Error::UnknownBlock(format!("Cannot set head {:?}", set_head)))
}
} else {
None
};
let write_result = self.storage.db.write(transaction).map_err(db_err);
if let Some(changes_trie_cache_update) = operation.changes_trie_cache_update {
self.changes_tries_storage.commit_cache(changes_trie_cache_update);
}
if let Some((number, hash, enacted, retracted, displaced_leaf, is_best, mut cache)) = imported {
if let Err(e) = write_result {
let mut leaves = self.blockchain.leaves.write();
let mut undo = leaves.undo();
if let Some(displaced_leaf) = displaced_leaf {
undo.undo_import(displaced_leaf);
}
if let Some(finalization_displaced) = finalization_displaced_leaves {
undo.undo_finalization(finalization_displaced);
}
return Err(e)
}
cache.sync_cache(
&enacted,
&retracted,
operation.storage_updates,
operation.child_storage_updates,
Some(hash),
Some(number),
|| is_best,
);
}
if let Some((enacted, retracted)) = cache_update {
self.shared_cache.lock().sync(&enacted, &retracted);
}
for (hash, number, is_best, is_finalized) in meta_updates {
self.blockchain.update_meta(hash, number, is_best, is_finalized);
}
Ok(())
}
// write stuff to a transaction after a new block is finalized.
// this canonicalizes finalized blocks. Fails if called with a block which
// was not a child of the last finalized block.
fn note_finalized(
&self,
transaction: &mut DBTransaction,
f_header: &Block::Header,
f_hash: Block::Hash,
displaced: &mut Option>>
) -> ClientResult<()> where
Block: BlockT,
{
let f_num = f_header.number().clone();
if self.storage.state_db.best_canonical().map(|c| f_num.saturated_into::() > c).unwrap_or(true) {
let parent_hash = f_header.parent_hash().clone();
let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone())?;
transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key);
let commit = self.storage.state_db.canonicalize_block(&f_hash)
.map_err(|e: state_db::Error| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(transaction, commit);
let changes_trie_config = self.changes_trie_config(parent_hash)?;
if let Some(changes_trie_config) = changes_trie_config {
self.changes_tries_storage.prune(&changes_trie_config, transaction, f_hash, f_num);
}
}
let new_displaced = self.blockchain.leaves.write().finalize_height(f_num);
match displaced {
x @ &mut None => *x = Some(new_displaced),
&mut Some(ref mut displaced) => displaced.merge(new_displaced),
}
Ok(())
}
}
fn apply_state_commit(transaction: &mut DBTransaction, commit: state_db::CommitSet>) {
for (key, val) in commit.data.inserted.into_iter() {
transaction.put(columns::STATE, &key[..], &val);
}
for key in commit.data.deleted.into_iter() {
transaction.delete(columns::STATE, &key[..]);
}
for (key, val) in commit.meta.inserted.into_iter() {
transaction.put(columns::STATE_META, &key[..], &val);
}
for key in commit.meta.deleted.into_iter() {
transaction.delete(columns::STATE_META, &key[..]);
}
}
impl client_api::backend::AuxStore for Backend where Block: BlockT {
fn insert_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator- ,
D: IntoIterator
- ,
>(&self, insert: I, delete: D) -> ClientResult<()> {
let mut transaction = DBTransaction::new();
for (k, v) in insert {
transaction.put(columns::AUX, k, v);
}
for k in delete {
transaction.delete(columns::AUX, k);
}
self.storage.db.write(transaction).map_err(db_err)?;
Ok(())
}
fn get_aux(&self, key: &[u8]) -> ClientResult
>> {
Ok(self.storage.db.get(columns::AUX, key).map(|r| r.map(|v| v.to_vec())).map_err(db_err)?)
}
}
impl client_api::backend::Backend for Backend where Block: BlockT {
type BlockImportOperation = BlockImportOperation;
type Blockchain = BlockchainDb;
type State = CachingState, Block>;
type ChangesTrieStorage = DbChangesTrieStorage;
type OffchainStorage = offchain::LocalStorage;
fn begin_operation(&self) -> ClientResult {
let old_state = self.state_at(BlockId::Hash(Default::default()))?;
Ok(BlockImportOperation {
pending_block: None,
old_state,
db_updates: PrefixedMemoryDB::default(),
storage_updates: Default::default(),
child_storage_updates: Default::default(),
changes_trie_updates: MemoryDB::default(),
changes_trie_cache_update: None,
aux_ops: Vec::new(),
finalized_blocks: Vec::new(),
set_head: None,
commit_state: false,
})
}
fn begin_state_operation(
&self,
operation: &mut Self::BlockImportOperation,
block: BlockId,
) -> ClientResult<()> {
operation.old_state = self.state_at(block)?;
operation.commit_state = true;
Ok(())
}
fn commit_operation(&self, operation: Self::BlockImportOperation)
-> ClientResult<()>
{
match self.try_commit_operation(operation) {
Ok(_) => {
self.storage.state_db.apply_pending();
Ok(())
},
e @ Err(_) => {
self.storage.state_db.revert_pending();
e
}
}
}
fn finalize_block(&self, block: BlockId, justification: Option)
-> ClientResult<()>
{
let mut transaction = DBTransaction::new();
let hash = self.blockchain.expect_block_hash_from_id(&block)?;
let header = self.blockchain.expect_header(block)?;
let mut displaced = None;
let commit = |displaced| {
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
&mut transaction,
&hash,
&header,
None,
justification,
displaced,
)?;
self.storage.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, number, is_best, is_finalized);
Ok(())
};
match commit(&mut displaced) {
Ok(()) => self.storage.state_db.apply_pending(),
e @ Err(_) => {
self.storage.state_db.revert_pending();
if let Some(displaced) = displaced {
self.blockchain.leaves.write().undo().undo_finalization(displaced);
}
return e;
}
}
Ok(())
}
fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage> {
Some(&self.changes_tries_storage)
}
fn offchain_storage(&self) -> Option {
Some(self.offchain_storage.clone())
}
fn revert(&self, n: NumberFor) -> ClientResult> {
let mut best = self.blockchain.info().best_number;
let finalized = self.blockchain.info().finalized_number;
let revertible = best - finalized;
let n = if revertible < n { revertible } else { n };
for c in 0 .. n.saturated_into::() {
if best.is_zero() {
return Ok(c.saturated_into::>())
}
let mut transaction = DBTransaction::new();
match self.storage.state_db.revert_one() {
Some(commit) => {
apply_state_commit(&mut transaction, commit);
let removed = self.blockchain.header(BlockId::Number(best))?.ok_or_else(
|| client::error::Error::UnknownBlock(
format!("Error reverting to {}. Block hash not found.", best)))?;
best -= One::one(); // prev block
let hash = self.blockchain.hash(best)?.ok_or_else(
|| client::error::Error::UnknownBlock(
format!("Error reverting to {}. Block hash not found.", best)))?;
let key = utils::number_and_hash_to_lookup_key(best.clone(), &hash)?;
transaction.put(columns::META, meta_keys::BEST_BLOCK, &key);
transaction.delete(columns::KEY_LOOKUP, removed.hash().as_ref());
children::remove_children(&mut transaction, columns::META, meta_keys::CHILDREN_PREFIX, hash);
self.storage.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, best, true, false);
self.blockchain.leaves.write().revert(removed.hash().clone(), removed.number().clone(), removed.parent_hash().clone());
}
None => return Ok(c.saturated_into::>())
}
}
Ok(n)
}
fn blockchain(&self) -> &BlockchainDb {
&self.blockchain
}
fn used_state_cache_size(&self) -> Option {
let used = (*&self.shared_cache).lock().used_storage_cache_size();
Some(used)
}
fn state_at(&self, block: BlockId) -> ClientResult {
use client::blockchain::HeaderBackend as BcHeaderBackend;
// special case for genesis initialization
match block {
BlockId::Hash(h) if h == Default::default() => {
let genesis_storage = DbGenesisStorage::new();
let root = genesis_storage.0.clone();
let db_state = DbState::new(Arc::new(genesis_storage), root);
let state = RefTrackingState::new(db_state, self.storage.clone(), None);
return Ok(CachingState::new(state, self.shared_cache.clone(), None));
},
_ => {}
}
match self.blockchain.header(block) {
Ok(Some(ref hdr)) => {
let hash = hdr.hash();
if !self.have_state_at(&hash, *hdr.number()) {
return Err(client::error::Error::UnknownBlock(format!("State already discarded for {:?}", block)))
}
if let Ok(()) = self.storage.state_db.pin(&hash) {
let root = H256::from_slice(hdr.state_root().as_ref());
let db_state = DbState::new(self.storage.clone(), root);
let state = RefTrackingState::new(db_state, self.storage.clone(), Some(hash.clone()));
Ok(CachingState::new(state, self.shared_cache.clone(), Some(hash)))
} else {
Err(client::error::Error::UnknownBlock(format!("State already discarded for {:?}", block)))
}
},
Ok(None) => Err(client::error::Error::UnknownBlock(format!("Unknown state for block {:?}", block))),
Err(e) => Err(e),
}
}
fn have_state_at(&self, hash: &Block::Hash, number: NumberFor) -> bool {
if self.is_archive {
match self.blockchain.header(BlockId::Hash(hash.clone())) {
Ok(Some(header)) => {
state_machine::Storage::get(self.storage.as_ref(), &header.state_root(), (&[], None)).unwrap_or(None).is_some()
},
_ => false,
}
} else {
!self.storage.state_db.is_pruned(hash, number.saturated_into::())
}
}
fn destroy_state(&self, state: Self::State) -> ClientResult<()> {
if let Some(hash) = state.cache.parent_hash.clone() {
let is_best = || self.blockchain.meta.read().best_hash == hash;
state.release().sync_cache(&[], &[], vec![], vec![], None, None, is_best);
}
Ok(())
}
fn get_import_lock(&self) -> &Mutex<()> {
&self.import_lock
}
}
impl client_api::backend::LocalBackend for Backend