Unverified Commit 10b0d793 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Replace AuxStore with custom RocksDB (#2471)



* Use KeyValueDB in approval-voting

* use KVDB instead of AuxStore

* add rocksdb to cargo toml

* add a Config struct

* create new DB in service

* fix dep for regular node

* make optional

* post merge fix

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent ce03f371
Pipeline #124703 passed with stages
in 31 minutes and 4 seconds
......@@ -5198,8 +5198,12 @@ version = "0.1.0"
dependencies = [
"assert_matches",
"bitvec",
"derive_more",
"futures 0.3.12",
"futures-timer 3.0.2",
"kvdb",
"kvdb-memorydb",
"kvdb-rocksdb",
"maplit",
"merlin",
"parity-scale-codec",
......
......@@ -13,6 +13,9 @@ tracing-futures = "0.2.4"
bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] }
merlin = "2.0"
schnorrkel = "0.9.1"
kvdb = "0.9.0"
kvdb-rocksdb = "0.11.0"
derive_more = "0.99.1"
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-overseer = { path = "../../overseer" }
......@@ -36,3 +39,4 @@ sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch =
maplit = "1.0.2"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
assert_matches = "1.4.0"
kvdb-memorydb = "0.9.0"
......@@ -16,7 +16,7 @@
//! Version 1 of the DB schema.
use sc_client_api::backend::AuxStore;
use kvdb::{DBTransaction, KeyValueDB};
use polkadot_node_primitives::approval::{DelayTranche, AssignmentCert};
use polkadot_primitives::v1::{
ValidatorIndex, GroupIndex, CandidateReceipt, SessionIndex, CoreIndex,
......@@ -27,6 +27,7 @@ use parity_scale_codec::{Encode, Decode};
use std::collections::{BTreeMap, HashMap};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0};
#[cfg(test)]
......@@ -39,6 +40,9 @@ pub struct Tick(u64);
pub type Bitfield = BitVec<BitOrderLsb0, u8>;
const NUM_COLUMNS: u32 = 1;
const DATA_COL: u32 = 0;
const STORED_BLOCKS_KEY: &[u8] = b"Approvals_StoredBlocks";
/// Details pertaining to our assignment on a block.
......@@ -103,6 +107,33 @@ pub struct BlockEntry {
pub children: Vec<Hash>,
}
/// Clear the given directory and create a RocksDB instance there.
pub fn clear_and_recreate(path: &std::path::Path, cache_size: usize)
-> std::io::Result<Arc<dyn KeyValueDB>>
{
use kvdb_rocksdb::{DatabaseConfig, Database as RocksDB};
tracing::info!("Recreating approval-checking DB at {:?}", path);
if let Err(e) = std::fs::remove_dir_all(path) {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e);
}
}
std::fs::create_dir_all(path)?;
let mut db_config = DatabaseConfig::with_columns(NUM_COLUMNS);
db_config.memory_budget.insert(DATA_COL, cache_size);
let path = path.to_str().ok_or_else(|| std::io::Error::new(
std::io::ErrorKind::Other,
format!("Non-UTF-8 database path {:?}", path),
))?;
Ok(Arc::new(RocksDB::open(&db_config, path)?))
}
/// A range from earliest..last block number stored within the DB.
#[derive(Encode, Decode, Debug, Clone, PartialEq)]
pub struct StoredBlockRange(BlockNumber, BlockNumber);
......@@ -119,14 +150,26 @@ impl From<Tick> for crate::Tick {
}
}
/// Errors while accessing things from the DB.
#[derive(Debug, derive_more::From, derive_more::Display)]
pub enum Error {
Io(std::io::Error),
InvalidDecoding(parity_scale_codec::Error),
}
impl std::error::Error for Error {}
/// Result alias for DB errors.
pub type Result<T> = std::result::Result<T, Error>;
/// Canonicalize some particular block, pruning everything before it and
/// pruning any competing branches at the same height.
pub(crate) fn canonicalize(
store: &impl AuxStore,
store: &dyn KeyValueDB,
canon_number: BlockNumber,
canon_hash: Hash,
)
-> sp_blockchain::Result<()>
-> Result<()>
{
let range = match load_stored_blocks(store)? {
None => return Ok(()),
......@@ -137,8 +180,7 @@ pub(crate) fn canonicalize(
},
};
let mut deleted_height_keys = Vec::new();
let mut deleted_block_keys = Vec::new();
let mut transaction = DBTransaction::new();
// Storing all candidates in memory is potentially heavy, but should be fine
// as long as finality doesn't stall for a long while. We could optimize this
......@@ -150,15 +192,15 @@ pub(crate) fn canonicalize(
let visit_and_remove_block_entry = |
block_hash: Hash,
deleted_block_keys: &mut Vec<_>,
transaction: &mut DBTransaction,
visited_candidates: &mut HashMap<CandidateHash, CandidateEntry>,
| -> sp_blockchain::Result<Vec<Hash>> {
| -> Result<Vec<Hash>> {
let block_entry = match load_block_entry(store, &block_hash)? {
None => return Ok(Vec::new()),
Some(b) => b,
};
deleted_block_keys.push(block_entry_key(&block_hash));
transaction.delete(DATA_COL, &block_entry_key(&block_hash)[..]);
for &(_, ref candidate_hash) in &block_entry.candidates {
let candidate = match visited_candidates.entry(*candidate_hash) {
Entry::Occupied(e) => e.into_mut(),
......@@ -179,12 +221,12 @@ pub(crate) fn canonicalize(
// First visit everything before the height.
for i in range.0..canon_number {
let at_height = load_blocks_at_height(store, i)?;
deleted_height_keys.push(blocks_at_height_key(i));
transaction.delete(DATA_COL, &blocks_at_height_key(i)[..]);
for b in at_height {
let _ = visit_and_remove_block_entry(
b,
&mut deleted_block_keys,
&mut transaction,
&mut visited_candidates,
)?;
}
......@@ -193,7 +235,7 @@ pub(crate) fn canonicalize(
// Then visit everything at the height.
let pruned_branches = {
let at_height = load_blocks_at_height(store, canon_number)?;
deleted_height_keys.push(blocks_at_height_key(canon_number));
transaction.delete(DATA_COL, &blocks_at_height_key(canon_number));
// Note that while there may be branches descending from blocks at earlier heights,
// we have already covered them by removing everything at earlier heights.
......@@ -202,7 +244,7 @@ pub(crate) fn canonicalize(
for b in at_height {
let children = visit_and_remove_block_entry(
b,
&mut deleted_block_keys,
&mut transaction,
&mut visited_candidates,
)?;
......@@ -220,7 +262,7 @@ pub(crate) fn canonicalize(
while let Some((height, next_child)) = frontier.pop() {
let children = visit_and_remove_block_entry(
next_child,
&mut deleted_block_keys,
&mut transaction,
&mut visited_candidates,
)?;
......@@ -240,32 +282,26 @@ pub(crate) fn canonicalize(
}
// Update all `CandidateEntry`s, deleting all those which now have empty `block_assignments`.
let (written_candidates, deleted_candidates) = {
let mut written = Vec::new();
let mut deleted = Vec::new();
for (candidate_hash, candidate) in visited_candidates {
if candidate.block_assignments.is_empty() {
deleted.push(candidate_entry_key(&candidate_hash));
} else {
written.push((candidate_entry_key(&candidate_hash), candidate.encode()));
}
for (candidate_hash, candidate) in visited_candidates {
if candidate.block_assignments.is_empty() {
transaction.delete(DATA_COL, &candidate_entry_key(&candidate_hash)[..]);
} else {
transaction.put_vec(
DATA_COL,
&candidate_entry_key(&candidate_hash)[..],
candidate.encode(),
);
}
(written, deleted)
};
}
// Update all blocks-at-height keys, deleting all those which now have empty `block_assignments`.
let written_at_height = {
visited_heights.into_iter().filter_map(|(h, at)| {
if at.is_empty() {
deleted_height_keys.push(blocks_at_height_key(h));
None
} else {
Some((blocks_at_height_key(h), at.encode()))
}
}).collect::<Vec<_>>()
};
for (h, at) in visited_heights {
if at.is_empty() {
transaction.delete(DATA_COL, &blocks_at_height_key(h)[..]);
} else {
transaction.put_vec(DATA_COL, &blocks_at_height_key(h), at.encode());
}
}
// due to the fork pruning, this range actually might go too far above where our actual highest block is,
// if a relatively short fork is canonicalized.
......@@ -274,81 +310,20 @@ pub(crate) fn canonicalize(
std::cmp::max(range.1, canon_number + 2),
).encode();
// Because aux-store requires &&[u8], we have to collect.
let inserted_keys: Vec<_> = std::iter::once((&STORED_BLOCKS_KEY[..], &new_range[..]))
.chain(written_candidates.iter().map(|&(ref k, ref v)| (&k[..], &v[..])))
.chain(written_at_height.iter().map(|&(ref k, ref v)| (&k[..], &v[..])))
.collect();
let deleted_keys: Vec<_> = deleted_block_keys.iter().map(|k| &k[..])
.chain(deleted_height_keys.iter().map(|k| &k[..]))
.chain(deleted_candidates.iter().map(|k| &k[..]))
.collect();
transaction.put_vec(DATA_COL, &STORED_BLOCKS_KEY[..], new_range);
// Update the values on-disk.
store.insert_aux(
inserted_keys.iter(),
deleted_keys.iter(),
)?;
Ok(())
store.write(transaction).map_err(Into::into)
}
/// Clear the aux store of everything.
pub(crate) fn clear(store: &impl AuxStore)
-> sp_blockchain::Result<()>
fn load_decode<D: Decode>(store: &dyn KeyValueDB, key: &[u8])
-> Result<Option<D>>
{
let range = match load_stored_blocks(store)? {
None => return Ok(()),
Some(range) => range,
};
let mut visited_height_keys = Vec::new();
let mut visited_block_keys = Vec::new();
let mut visited_candidate_keys = Vec::new();
for i in range.0..range.1 {
let at_height = load_blocks_at_height(store, i)?;
visited_height_keys.push(blocks_at_height_key(i));
for block_hash in at_height {
let block_entry = match load_block_entry(store, &block_hash)? {
None => continue,
Some(e) => e,
};
visited_block_keys.push(block_entry_key(&block_hash));
for &(_, candidate_hash) in &block_entry.candidates {
visited_candidate_keys.push(candidate_entry_key(&candidate_hash));
}
}
}
// unfortunately demands a `collect` because aux store wants `&&[u8]` for some reason.
let visited_keys_borrowed = visited_height_keys.iter().map(|x| &x[..])
.chain(visited_block_keys.iter().map(|x| &x[..]))
.chain(visited_candidate_keys.iter().map(|x| &x[..]))
.chain(std::iter::once(&STORED_BLOCKS_KEY[..]))
.collect::<Vec<_>>();
store.insert_aux(&[], &visited_keys_borrowed)?;
Ok(())
}
fn load_decode<D: Decode>(store: &impl AuxStore, key: &[u8])
-> sp_blockchain::Result<Option<D>>
{
match store.get_aux(key)? {
match store.get(DATA_COL, key)? {
None => Ok(None),
Some(raw) => D::decode(&mut &raw[..])
.map(Some)
.map_err(|e| sp_blockchain::Error::Storage(
format!("Failed to decode item in approvals DB: {:?}", e)
)),
.map_err(Into::into),
}
}
......@@ -372,16 +347,18 @@ pub(crate) struct NewCandidateInfo {
/// `None` for any of the candidates referenced by the block entry. In these cases,
/// no information about new candidates will be referred to by this function.
pub(crate) fn add_block_entry(
store: &impl AuxStore,
store: &dyn KeyValueDB,
parent_hash: Hash,
number: BlockNumber,
entry: BlockEntry,
n_validators: usize,
candidate_info: impl Fn(&CandidateHash) -> Option<NewCandidateInfo>,
) -> sp_blockchain::Result<Vec<(CandidateHash, CandidateEntry)>> {
) -> Result<Vec<(CandidateHash, CandidateEntry)>> {
let mut transaction = DBTransaction::new();
let session = entry.session;
let new_block_range = {
// Update the stored block range.
{
let new_range = match load_stored_blocks(store)? {
None => Some(StoredBlockRange(number, number + 1)),
Some(range) => if range.1 <= number {
......@@ -391,10 +368,11 @@ pub(crate) fn add_block_entry(
}
};
new_range.map(|n| (STORED_BLOCKS_KEY, n.encode()))
new_range.map(|n| transaction.put_vec(DATA_COL, &STORED_BLOCKS_KEY[..], n.encode()))
};
let updated_blocks_at = {
// Update the blocks at height meta key.
{
let mut blocks_at_height = load_blocks_at_height(store, number)?;
if blocks_at_height.contains(&entry.block_hash) {
// seems we already have a block entry for this block. nothing to do here.
......@@ -402,13 +380,13 @@ pub(crate) fn add_block_entry(
}
blocks_at_height.push(entry.block_hash);
(blocks_at_height_key(number), blocks_at_height.encode())
transaction.put_vec(DATA_COL, &blocks_at_height_key(number)[..], blocks_at_height.encode())
};
let mut candidate_entries = Vec::with_capacity(entry.candidates.len());
let candidate_entry_updates = {
let mut updated_entries = Vec::with_capacity(entry.candidates.len());
// read and write all updated entries.
{
for &(_, ref candidate_hash) in &entry.candidates {
let NewCandidateInfo {
candidate,
......@@ -438,43 +416,26 @@ pub(crate) fn add_block_entry(
}
);
updated_entries.push(
(candidate_entry_key(&candidate_hash), candidate_entry.encode())
transaction.put_vec(
DATA_COL,
&candidate_entry_key(&candidate_hash)[..],
candidate_entry.encode(),
);
candidate_entries.push((*candidate_hash, candidate_entry));
}
updated_entries
};
let updated_parent = {
load_block_entry(store, &parent_hash)?.map(|mut e| {
e.children.push(entry.block_hash);
(block_entry_key(&parent_hash), e.encode())
})
};
let write_block_entry = (block_entry_key(&entry.block_hash), entry.encode());
// write:
// - new block range
// - updated blocks-at item
// - fresh and updated candidate entries
// - the parent block entry.
// - the block entry itself
// Unfortunately have to collect because aux-store demands &(&[u8], &[u8]).
let all_keys_and_values: Vec<_> = new_block_range.as_ref().into_iter()
.map(|&(ref k, ref v)| (&k[..], &v[..]))
.chain(std::iter::once((&updated_blocks_at.0[..], &updated_blocks_at.1[..])))
.chain(candidate_entry_updates.iter().map(|&(ref k, ref v)| (&k[..], &v[..])))
.chain(std::iter::once((&write_block_entry.0[..], &write_block_entry.1[..])))
.chain(updated_parent.as_ref().into_iter().map(|&(ref k, ref v)| (&k[..], &v[..])))
.collect();
// Update the child index for the parent.
load_block_entry(store, &parent_hash)?.map(|mut e| {
e.children.push(entry.block_hash);
transaction.put_vec(DATA_COL, &block_entry_key(&parent_hash)[..], e.encode())
});
store.insert_aux(&all_keys_and_values, &[])?;
// Put the new block entry in.
transaction.put_vec(DATA_COL, &block_entry_key(&entry.block_hash)[..], entry.encode());
store.write(transaction)?;
Ok(candidate_entries)
}
......@@ -501,57 +462,55 @@ impl Transaction {
}
/// Write the contents of the transaction, atomically, to the DB.
pub(crate) fn write(self, db: &impl AuxStore) -> sp_blockchain::Result<()> {
pub(crate) fn write(self, db: &dyn KeyValueDB) -> Result<()> {
if self.block_entries.is_empty() && self.candidate_entries.is_empty() {
return Ok(())
}
let blocks: Vec<_> = self.block_entries.into_iter().map(|(hash, entry)| {
let mut db_transaction = DBTransaction::new();
for (hash, entry) in self.block_entries {
let k = block_entry_key(&hash);
let v = entry.encode();
(k, v)
}).collect();
db_transaction.put_vec(DATA_COL, &k, v);
}
let candidates: Vec<_> = self.candidate_entries.into_iter().map(|(hash, entry)| {
for (hash, entry) in self.candidate_entries {
let k = candidate_entry_key(&hash);
let v = entry.encode();
(k, v)
}).collect();
let kv = blocks.iter().map(|(k, v)| (&k[..], &v[..]))
.chain(candidates.iter().map(|(k, v)| (&k[..], &v[..])))
.collect::<Vec<_>>();
db_transaction.put_vec(DATA_COL, &k, v);
}
db.insert_aux(&kv, &[])
db.write(db_transaction).map_err(Into::into)
}
}
/// Load the stored-blocks key from the state.
fn load_stored_blocks(store: &impl AuxStore)
-> sp_blockchain::Result<Option<StoredBlockRange>>
fn load_stored_blocks(store: &dyn KeyValueDB)
-> Result<Option<StoredBlockRange>>
{
load_decode(store, STORED_BLOCKS_KEY)
}
/// Load a blocks-at-height entry for a given block number.
pub(crate) fn load_blocks_at_height(store: &impl AuxStore, block_number: BlockNumber)
-> sp_blockchain::Result<Vec<Hash>> {
pub(crate) fn load_blocks_at_height(store: &dyn KeyValueDB, block_number: BlockNumber)
-> Result<Vec<Hash>> {
load_decode(store, &blocks_at_height_key(block_number))
.map(|x| x.unwrap_or_default())
}
/// Load a block entry from the aux store.
pub(crate) fn load_block_entry(store: &impl AuxStore, block_hash: &Hash)
-> sp_blockchain::Result<Option<BlockEntry>>
pub(crate) fn load_block_entry(store: &dyn KeyValueDB, block_hash: &Hash)
-> Result<Option<BlockEntry>>
{
load_decode(store, &block_entry_key(block_hash))
}
/// Load a candidate entry from the aux store.
pub(crate) fn load_candidate_entry(store: &impl AuxStore, candidate_hash: &CandidateHash)
-> sp_blockchain::Result<Option<CandidateEntry>>
pub(crate) fn load_candidate_entry(store: &dyn KeyValueDB, candidate_hash: &CandidateHash)
-> Result<Option<CandidateEntry>>
{
load_decode(store, &candidate_entry_key(candidate_hash))
}
......
......@@ -18,64 +18,37 @@
use super::*;
use polkadot_primitives::v1::Id as ParaId;
use std::cell::RefCell;
#[derive(Default)]
pub struct TestStore {
inner: RefCell<HashMap<Vec<u8>, Vec<u8>>>,
pub(crate) fn write_stored_blocks(tx: &mut DBTransaction, range: StoredBlockRange) {
tx.put_vec(
DATA_COL,
&STORED_BLOCKS_KEY[..],
range.encode(),
);
}
impl AuxStore for TestStore {
fn insert_aux<'a, 'b: 'a, 'c: 'a, I, D>(&self, insertions: I, deletions: D) -> sp_blockchain::Result<()>
where I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>, D: IntoIterator<Item = &'a &'b [u8]>
{
let mut store = self.inner.borrow_mut();
// insertions before deletions.
for (k, v) in insertions {
store.insert(k.to_vec(), v.to_vec());
}
for k in deletions {
store.remove(&k[..]);
}
Ok(())
}
fn get_aux(&self, key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
Ok(self.inner.borrow().get(key).map(|v| v.clone()))
}
pub(crate) fn write_blocks_at_height(tx: &mut DBTransaction, height: BlockNumber, blocks: &[Hash]) {
tx.put_vec(
DATA_COL,
&blocks_at_height_key(height)[..],
blocks.encode(),
);
}
impl TestStore {
pub(crate) fn write_stored_blocks(&self, range: StoredBlockRange) {
self.inner.borrow_mut().insert(
STORED_BLOCKS_KEY.to_vec(),
range.encode(),
);
}
pub(crate) fn write_blocks_at_height(&self, height: BlockNumber, blocks: &[Hash]) {
self.inner.borrow_mut().insert(
blocks_at_height_key(height).to_vec(),
blocks.encode(),
);
}
pub(crate) fn write_block_entry(&self, block_hash: &Hash, entry: &BlockEntry) {
self.inner.borrow_mut().insert(
block_entry_key(block_hash).to_vec(),
entry.encode(),
);
}
pub(crate) fn write_block_entry(tx: &mut DBTransaction, block_hash: &Hash, entry: &BlockEntry) {
tx.put_vec(
DATA_COL,
&block_entry_key(block_hash)[..],
entry.encode(),
);
}
pub(crate) fn write_candidate_entry(&self, candidate_hash: &CandidateHash, entry: &CandidateEntry) {
self.inner.borrow_mut().insert(
candidate_entry_key(candidate_hash).to_vec(),
entry.encode(),
);