Newer
Older
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Client backend that uses RocksDB database as storage. State is still kept in memory.
extern crate substrate_client as client;
extern crate kvdb_rocksdb;
extern crate kvdb;
extern crate hashdb;
extern crate memorydb;
extern crate parking_lot;
extern crate substrate_state_machine as state_machine;
extern crate substrate_primitives as primitives;
extern crate substrate_runtime_support as runtime_support;
extern crate substrate_runtime_primitives as runtime_primitives;
#[macro_use]
extern crate log;
#[cfg(test)]
extern crate kvdb_memorydb;
use std::sync::Arc;
use std::path::PathBuf;
use kvdb::{KeyValueDB, DBTransaction};
use memorydb::MemoryDB;
use parking_lot::RwLock;
use runtime_primitives::generic::BlockId;
use runtime_primitives::bft::Justification;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, Hashing, HashingFor, Zero};
use state_machine::backend::Backend as StateBackend;
use state_machine::CodeExecutor;
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
pub type DbState = state_machine::TrieBackend;
/// Database settings.
pub struct DatabaseSettings {
/// Cache size in bytes. If `None` default is used.
pub cache_size: Option<usize>,
/// Path to the database.
pub path: PathBuf,
}
/// Create an instance of db-backed client.
) -> Result<client::Client<Backend<Block>, client::LocalCallExecutor<Backend<Block>, E>, Block>, client::error::Error>
Block: BlockT,
<Block::Header as HeaderT>::Number: As<u32>,
Block::Hash: Into<[u8; 32]>, // TODO: remove when patricia_trie generic.
let backend = Arc::new(Backend::new(&settings)?);
let executor = client::LocalCallExecutor::new(backend.clone(), executor);
Ok(client::Client::new(backend, executor, genesis_builder)?)
}
mod columns {
pub const META: Option<u32> = Some(0);
pub const STATE: Option<u32> = Some(1);
pub const BLOCK_INDEX: Option<u32> = Some(2);
pub const HEADER: Option<u32> = Some(3);
pub const BODY: Option<u32> = Some(4);
pub const JUSTIFICATION: Option<u32> = Some(5);
pub const NUM_COLUMNS: u32 = 6;
}
mod meta {
pub const BEST_BLOCK: &[u8; 4] = b"best";
}
struct PendingBlock<Block: BlockT> {
header: Block::Header,
justification: Option<Justification<Block::Hash>>,
body: Option<Vec<Block::Extrinsic>>,
struct Meta<N, H> {
best_hash: H,
best_number: N,
genesis_hash: H,
}
type BlockKey = [u8; 4];
// Little endian
fn number_to_db_key<N>(n: N) -> BlockKey where N: As<u32> {
let n: u32 = n.as_();
[
(n >> 24) as u8,
((n >> 16) & 0xff) as u8,
((n >> 8) & 0xff) as u8,
(n & 0xff) as u8
]
}
// Maps database error to client error
fn db_err(err: kvdb::Error) -> client::error::Error {
use std::error::Error;
match err.kind() {
&kvdb::ErrorKind::Io(ref err) => client::error::ErrorKind::Backend(err.description().into()).into(),
&kvdb::ErrorKind::Msg(ref m) => client::error::ErrorKind::Backend(m.clone()).into(),
_ => client::error::ErrorKind::Backend("Unknown backend error".into()).into(),
}
}
/// Block database
db: Arc<KeyValueDB>,
meta: RwLock<Meta<<Block::Header as HeaderT>::Number, Block::Hash>>,
}
impl<Block: BlockT> BlockchainDb<Block> where <Block::Header as HeaderT>::Number: As<u32> {
fn id(&self, id: BlockId<Block>) -> Result<Option<BlockKey>, client::error::Error> {
match id {
BlockId::Hash(h) => {
{
let meta = self.meta.read();
if meta.best_hash == h {
return Ok(Some(number_to_db_key(meta.best_number)));
}
}
self.db.get(columns::BLOCK_INDEX, h.as_ref()).map(|v| v.map(|v| {
let mut key: [u8; 4] = [0; 4];
key.copy_from_slice(&v);
key
})).map_err(db_err)
},
BlockId::Number(n) => Ok(Some(number_to_db_key(n))),
}
}
fn new(db: Arc<KeyValueDB>) -> Result<Self, client::error::Error> {
let (best_hash, best_number) = if let Some(Some(header)) = db.get(columns::META, meta::BEST_BLOCK).and_then(|id|
match id {
Some(id) => db.get(columns::HEADER, &id).map(|h| h.map(|b| Block::Header::decode(&mut &b[..]))),
None => Ok(None),
}).map_err(db_err)?
{
let hash = header.hash();
debug!("DB Opened blockchain db, best {:?} ({})", hash, header.number());
(hash, header.number().clone())
let genesis_hash = db.get(columns::HEADER, &number_to_db_key(<Block::Header as HeaderT>::Number::zero())).map_err(db_err)?
.map(|b| HashingFor::<Block>::hash(&b)).unwrap_or_default().into();
Ok(BlockchainDb {
db,
meta: RwLock::new(Meta {
best_hash,
best_number,
genesis_hash,
})
})
}
fn read_db(&self, id: BlockId<Block>, column: Option<u32>) -> Result<Option<DBValue>, client::error::Error> {
self.id(id).and_then(|key|
match key {
Some(key) => self.db.get(column, &key).map_err(db_err),
None => Ok(None),
})
}
fn update_meta(&self, hash: Block::Hash, number: <Block::Header as HeaderT>::Number, is_best: bool) {
if is_best {
let mut meta = self.meta.write();
meta.genesis_hash = hash;
}
meta.best_number = number;
meta.best_hash = hash;
}
}
}
impl<Block: BlockT> client::blockchain::Backend<Block> for BlockchainDb<Block> where <Block::Header as HeaderT>::Number: As<u32> {
fn header(&self, id: BlockId<Block>) -> Result<Option<Block::Header>, client::error::Error> {
Some(header) => match Block::Header::decode(&mut &header[..]) {
Some(header) => Ok(Some(header)),
None => return Err(client::error::ErrorKind::Backend("Error decoding header".into()).into()),
}
None => Ok(None),
}
}
fn body(&self, id: BlockId<Block>) -> Result<Option<Vec<Block::Extrinsic>>, client::error::Error> {
Some(body) => match Slicable::decode(&mut &body[..]) {
Some(body) => Ok(Some(body)),
None => return Err(client::error::ErrorKind::Backend("Error decoding body".into()).into()),
}
None => Ok(None),
}
}
fn justification(&self, id: BlockId<Block>) -> Result<Option<Justification<Block::Hash>>, client::error::Error> {
match self.read_db(id, columns::JUSTIFICATION)? {
Some(justification) => match Slicable::decode(&mut &justification[..]) {
Some(justification) => Ok(Some(justification)),
None => return Err(client::error::ErrorKind::Backend("Error decoding justification".into()).into()),
}
None => Ok(None),
}
}
fn info(&self) -> Result<client::blockchain::Info<Block>, client::error::Error> {
let meta = self.meta.read();
Ok(client::blockchain::Info {
best_hash: meta.best_hash,
best_number: meta.best_number,
genesis_hash: meta.genesis_hash,
})
}
fn status(&self, id: BlockId<Block>) -> Result<client::blockchain::BlockStatus, client::error::Error> {
let exists = match id {
BlockId::Hash(_) => self.id(id)?.is_some(),
BlockId::Number(n) => n <= self.meta.read().best_number,
};
match exists {
true => Ok(client::blockchain::BlockStatus::InChain),
false => Ok(client::blockchain::BlockStatus::Unknown),
}
}
fn hash(&self, number: <Block::Header as HeaderT>::Number) -> Result<Option<Block::Hash>, client::error::Error> {
self.read_db(BlockId::Number(number), columns::HEADER).map(|x|
x.map(|raw| HashingFor::<Block>::hash(&raw[..])).map(Into::into)
/// Database transaction
pub struct BlockImportOperation<Block: BlockT> {
old_state: DbState,
updates: MemoryDB,
}
impl<Block: BlockT> client::backend::BlockImportOperation<Block> for BlockImportOperation<Block> {
fn state(&self) -> Result<Option<&Self::State>, client::error::Error> {
Ok(Some(&self.old_state))
fn set_block_data(&mut self, header: Block::Header, body: Option<Vec<Block::Extrinsic>>, justification: Option<Justification<Block::Hash>>, is_best: bool) -> Result<(), client::error::Error> {
assert!(self.pending_block.is_none(), "Only one block per operation is allowed");
self.pending_block = Some(PendingBlock {
header,
body,
justification,
is_best,
});
Ok(())
}
fn update_storage(&mut self, update: MemoryDB) -> Result<(), client::error::Error> {
self.updates = update;
Ok(())
}
fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, iter: I) -> Result<(), client::error::Error> {
let (_, update) = self.old_state.storage_root(iter.into_iter().map(|(k, v)| (k, Some(v))));
self.updates = update;
/// Disk backend. Keeps data in a key-value store. In archive mode, trie nodes are kept from all blocks.
/// Otherwise, trie nodes are kept only from the most recent block.
db: Arc<KeyValueDB>,
archive: bool,
impl<Block: BlockT> Backend<Block> where <Block::Header as HeaderT>::Number: As<u32> {
/// Create a new instance of database backend.
pub fn new(config: &DatabaseSettings) -> Result<Self, client::error::Error> {
let mut db_config = DatabaseConfig::with_columns(Some(columns::NUM_COLUMNS));
db_config.memory_budget = config.cache_size;
db_config.wal = true;
let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?;
let db = Arc::new(Database::open(&db_config, &path).map_err(db_err)?);
Backend::from_kvdb(db as Arc<_>, true)
}
#[cfg(test)]
let db = Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS));
Backend::from_kvdb(db as Arc<_>, false).expect("failed to create test-db")
}
fn from_kvdb(db: Arc<KeyValueDB>, archive: bool) -> Result<Self, client::error::Error> {
let blockchain = BlockchainDb::new(db.clone())?;
Ok(Backend {
db,
blockchain,
archive
impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> where
<Block::Header as HeaderT>::Number: As<u32>,
Block::Hash: Into<[u8; 32]>, // TODO: remove when patricia_trie generic.
{
type BlockImportOperation = BlockImportOperation<Block>;
type Blockchain = BlockchainDb<Block>;
fn begin_operation(&self, block: BlockId<Block>) -> Result<Self::BlockImportOperation, client::error::Error> {
let state = self.state_at(block)?;
Ok(BlockImportOperation {
pending_block: None,
old_state: state,
updates: MemoryDB::default(),
fn commit_operation(&self, mut operation: Self::BlockImportOperation) -> Result<(), client::error::Error> {
let mut transaction = DBTransaction::new();
if let Some(pending_block) = operation.pending_block {
let hash = pending_block.header.hash();
let number = pending_block.header.number().clone();
let key = number_to_db_key(pending_block.header.number().clone());
transaction.put(columns::HEADER, &key, &pending_block.header.encode());
if let Some(body) = pending_block.body {
transaction.put(columns::BODY, &key, &body.encode());
}
if let Some(justification) = pending_block.justification {
transaction.put(columns::JUSTIFICATION, &key, &justification.encode());
}
transaction.put(columns::BLOCK_INDEX, hash.as_ref(), &key);
if pending_block.is_best {
transaction.put(columns::META, meta::BEST_BLOCK, &key);
}
for (key, (val, rc)) in operation.updates.drain() {
if rc > 0 {
transaction.put(columns::STATE, &key.0[..], &val);
} else if rc < 0 && !self.archive {
transaction.delete(columns::STATE, &key.0[..]);
}
}
debug!("DB Commit {:?} ({})", hash, number);
self.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, number, pending_block.is_best);
}
Ok(())
}
fn state_at(&self, block: BlockId<Block>) -> Result<Self::State, client::error::Error> {
use client::blockchain::Backend as BcBackend;
// special case for genesis initialization
match block {
BlockId::Hash(h) if h == Default::default() =>
return Ok(DbState::with_kvdb_for_genesis(self.db.clone(), ::columns::STATE)),
self.blockchain.header(block).and_then(|maybe_hdr| maybe_hdr.map(|hdr| {
let root: [u8; 32] = hdr.state_root().clone().into();
DbState::with_kvdb(self.db.clone(), ::columns::STATE, root.into())
}).ok_or_else(|| client::error::ErrorKind::UnknownBlock(format!("{:?}", block)).into()))
impl<Block: BlockT> client::backend::LocalBackend<Block> for Backend<Block> where
<Block::Header as HeaderT>::Number: As<u32>,
Block::Hash: Into<[u8; 32]>, // TODO: remove when patricia_trie generic.
{}
#[cfg(test)]
mod tests {
use super::*;
use client::backend::Backend as BTrait;
use client::backend::BlockImportOperation as Op;
use client::blockchain::Backend as BCTrait;
use runtime_primitives::testing::{Header, Block as RawBlock};
type Block = RawBlock<u64>;
#[test]
fn block_hash_inserted_correctly() {
assert!(db.blockchain().hash(i).unwrap().is_none());
{
let id = if i == 0 {
BlockId::Hash(Default::default())
} else {
BlockId::Number(i - 1)
};
let mut op = db.begin_operation(id).unwrap();
number: i,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
op.set_block_data(
header,
Some(vec![]),
None,
true,
).unwrap();
db.commit_operation(op).unwrap();
}
assert!(db.blockchain().hash(i).unwrap().is_some())
}
}
{
let mut op = db.begin_operation(BlockId::Hash(Default::default())).unwrap();
number: 0,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage = vec![
(vec![1, 3, 5], vec![2, 4, 6]),
(vec![1, 2, 3], vec![9, 9, 9]),
];
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
op.reset_storage(storage.iter().cloned()).unwrap();
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();
db.commit_operation(op).unwrap();
let state = db.state_at(BlockId::Number(0)).unwrap();
assert_eq!(state.storage(&[1, 3, 5]).unwrap(), Some(vec![2, 4, 6]));
assert_eq!(state.storage(&[1, 2, 3]).unwrap(), Some(vec![9, 9, 9]));
assert_eq!(state.storage(&[5, 5, 5]).unwrap(), None);
}
{
let mut op = db.begin_operation(BlockId::Number(0)).unwrap();
number: 1,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage = vec![
(vec![1, 3, 5], None),
(vec![5, 5, 5], Some(vec![4, 5, 6])),
];
let (root, overlay) = op.old_state.storage_root(storage.iter().cloned());
op.update_storage(overlay).unwrap();
header.state_root = root.into();
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();
db.commit_operation(op).unwrap();
let state = db.state_at(BlockId::Number(1)).unwrap();
assert_eq!(state.storage(&[1, 3, 5]).unwrap(), None);
assert_eq!(state.storage(&[1, 2, 3]).unwrap(), Some(vec![9, 9, 9]));
assert_eq!(state.storage(&[5, 5, 5]).unwrap(), Some(vec![4, 5, 6]));
}
}
#[test]
fn delete_only_when_negative_rc() {
let key;
{
let mut op = db.begin_operation(BlockId::Hash(Default::default())).unwrap();
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
number: 0,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage: Vec<(_, _)> = vec![];
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
op.reset_storage(storage.iter().cloned()).unwrap();
key = op.updates.insert(b"hello");
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();
db.commit_operation(op).unwrap();
assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
}
{
let mut op = db.begin_operation(BlockId::Number(0)).unwrap();
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
number: 1,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage: Vec<(_, _)> = vec![];
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
op.updates.insert(b"hello");
op.updates.remove(&key);
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();
db.commit_operation(op).unwrap();
assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
}
{
let mut op = db.begin_operation(BlockId::Number(1)).unwrap();
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
number: 1,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage: Vec<(_, _)> = vec![];
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
op.updates.remove(&key);
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();
db.commit_operation(op).unwrap();
assert!(db.db.get(::columns::STATE, &key.0[..]).unwrap().is_none());
}
}