diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index ad580ba102be8d11d51995043b4ca489d52eac32..46ea293c3faf6cae8eb7dec83fc8bcbf78f26d1e 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5198,8 +5198,12 @@ version = "0.1.0" dependencies = [ "assert_matches", "bitvec", + "derive_more", "futures 0.3.12", "futures-timer 3.0.2", + "kvdb", + "kvdb-memorydb", + "kvdb-rocksdb", "maplit", "merlin", "parity-scale-codec", diff --git a/polkadot/node/core/approval-voting/Cargo.toml b/polkadot/node/core/approval-voting/Cargo.toml index 35117355cbce765f3c90cf6416373c22e44310cf..c92412880681a6adf99ea6e74d51bf5c6849f5bf 100644 --- a/polkadot/node/core/approval-voting/Cargo.toml +++ b/polkadot/node/core/approval-voting/Cargo.toml @@ -13,6 +13,9 @@ tracing-futures = "0.2.4" bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] } merlin = "2.0" schnorrkel = "0.9.1" +kvdb = "0.9.0" +kvdb-rocksdb = "0.11.0" +derive_more = "0.99.1" polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-overseer = { path = "../../overseer" } @@ -36,3 +39,4 @@ sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = maplit = "1.0.2" polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } assert_matches = "1.4.0" +kvdb-memorydb = "0.9.0" diff --git a/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs b/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs index 9c655dc333d31da997d11192683a0c722886ac0f..7397756f1220b51023272683faa874f2986da96f 100644 --- a/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs +++ b/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs @@ -16,7 +16,7 @@ //! Version 1 of the DB schema. -use sc_client_api::backend::AuxStore; +use kvdb::{DBTransaction, KeyValueDB}; use polkadot_node_primitives::approval::{DelayTranche, AssignmentCert}; use polkadot_primitives::v1::{ ValidatorIndex, GroupIndex, CandidateReceipt, SessionIndex, CoreIndex, @@ -27,6 +27,7 @@ use parity_scale_codec::{Encode, Decode}; use std::collections::{BTreeMap, HashMap}; use std::collections::hash_map::Entry; +use std::sync::Arc; use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0}; #[cfg(test)] @@ -39,6 +40,9 @@ pub struct Tick(u64); pub type Bitfield = BitVec<BitOrderLsb0, u8>; +const NUM_COLUMNS: u32 = 1; +const DATA_COL: u32 = 0; + const STORED_BLOCKS_KEY: &[u8] = b"Approvals_StoredBlocks"; /// Details pertaining to our assignment on a block. @@ -103,6 +107,33 @@ pub struct BlockEntry { pub children: Vec<Hash>, } +/// Clear the given directory and create a RocksDB instance there. +pub fn clear_and_recreate(path: &std::path::Path, cache_size: usize) + -> std::io::Result<Arc<dyn KeyValueDB>> +{ + use kvdb_rocksdb::{DatabaseConfig, Database as RocksDB}; + + tracing::info!("Recreating approval-checking DB at {:?}", path); + + if let Err(e) = std::fs::remove_dir_all(path) { + if e.kind() != std::io::ErrorKind::NotFound { + return Err(e); + } + } + std::fs::create_dir_all(path)?; + + let mut db_config = DatabaseConfig::with_columns(NUM_COLUMNS); + + db_config.memory_budget.insert(DATA_COL, cache_size); + + let path = path.to_str().ok_or_else(|| std::io::Error::new( + std::io::ErrorKind::Other, + format!("Non-UTF-8 database path {:?}", path), + ))?; + + Ok(Arc::new(RocksDB::open(&db_config, path)?)) +} + /// A range from earliest..last block number stored within the DB. #[derive(Encode, Decode, Debug, Clone, PartialEq)] pub struct StoredBlockRange(BlockNumber, BlockNumber); @@ -119,14 +150,26 @@ impl From<Tick> for crate::Tick { } } +/// Errors while accessing things from the DB. +#[derive(Debug, derive_more::From, derive_more::Display)] +pub enum Error { + Io(std::io::Error), + InvalidDecoding(parity_scale_codec::Error), +} + +impl std::error::Error for Error {} + +/// Result alias for DB errors. +pub type Result<T> = std::result::Result<T, Error>; + /// Canonicalize some particular block, pruning everything before it and /// pruning any competing branches at the same height. pub(crate) fn canonicalize( - store: &impl AuxStore, + store: &dyn KeyValueDB, canon_number: BlockNumber, canon_hash: Hash, ) - -> sp_blockchain::Result<()> + -> Result<()> { let range = match load_stored_blocks(store)? { None => return Ok(()), @@ -137,8 +180,7 @@ pub(crate) fn canonicalize( }, }; - let mut deleted_height_keys = Vec::new(); - let mut deleted_block_keys = Vec::new(); + let mut transaction = DBTransaction::new(); // Storing all candidates in memory is potentially heavy, but should be fine // as long as finality doesn't stall for a long while. We could optimize this @@ -150,15 +192,15 @@ pub(crate) fn canonicalize( let visit_and_remove_block_entry = | block_hash: Hash, - deleted_block_keys: &mut Vec<_>, + transaction: &mut DBTransaction, visited_candidates: &mut HashMap<CandidateHash, CandidateEntry>, - | -> sp_blockchain::Result<Vec<Hash>> { + | -> Result<Vec<Hash>> { let block_entry = match load_block_entry(store, &block_hash)? { None => return Ok(Vec::new()), Some(b) => b, }; - deleted_block_keys.push(block_entry_key(&block_hash)); + transaction.delete(DATA_COL, &block_entry_key(&block_hash)[..]); for &(_, ref candidate_hash) in &block_entry.candidates { let candidate = match visited_candidates.entry(*candidate_hash) { Entry::Occupied(e) => e.into_mut(), @@ -179,12 +221,12 @@ pub(crate) fn canonicalize( // First visit everything before the height. for i in range.0..canon_number { let at_height = load_blocks_at_height(store, i)?; - deleted_height_keys.push(blocks_at_height_key(i)); + transaction.delete(DATA_COL, &blocks_at_height_key(i)[..]); for b in at_height { let _ = visit_and_remove_block_entry( b, - &mut deleted_block_keys, + &mut transaction, &mut visited_candidates, )?; } @@ -193,7 +235,7 @@ pub(crate) fn canonicalize( // Then visit everything at the height. let pruned_branches = { let at_height = load_blocks_at_height(store, canon_number)?; - deleted_height_keys.push(blocks_at_height_key(canon_number)); + transaction.delete(DATA_COL, &blocks_at_height_key(canon_number)); // Note that while there may be branches descending from blocks at earlier heights, // we have already covered them by removing everything at earlier heights. @@ -202,7 +244,7 @@ pub(crate) fn canonicalize( for b in at_height { let children = visit_and_remove_block_entry( b, - &mut deleted_block_keys, + &mut transaction, &mut visited_candidates, )?; @@ -220,7 +262,7 @@ pub(crate) fn canonicalize( while let Some((height, next_child)) = frontier.pop() { let children = visit_and_remove_block_entry( next_child, - &mut deleted_block_keys, + &mut transaction, &mut visited_candidates, )?; @@ -240,32 +282,26 @@ pub(crate) fn canonicalize( } // Update all `CandidateEntry`s, deleting all those which now have empty `block_assignments`. - let (written_candidates, deleted_candidates) = { - let mut written = Vec::new(); - let mut deleted = Vec::new(); - - for (candidate_hash, candidate) in visited_candidates { - if candidate.block_assignments.is_empty() { - deleted.push(candidate_entry_key(&candidate_hash)); - } else { - written.push((candidate_entry_key(&candidate_hash), candidate.encode())); - } + for (candidate_hash, candidate) in visited_candidates { + if candidate.block_assignments.is_empty() { + transaction.delete(DATA_COL, &candidate_entry_key(&candidate_hash)[..]); + } else { + transaction.put_vec( + DATA_COL, + &candidate_entry_key(&candidate_hash)[..], + candidate.encode(), + ); } - - (written, deleted) - }; + } // Update all blocks-at-height keys, deleting all those which now have empty `block_assignments`. - let written_at_height = { - visited_heights.into_iter().filter_map(|(h, at)| { - if at.is_empty() { - deleted_height_keys.push(blocks_at_height_key(h)); - None - } else { - Some((blocks_at_height_key(h), at.encode())) - } - }).collect::<Vec<_>>() - }; + for (h, at) in visited_heights { + if at.is_empty() { + transaction.delete(DATA_COL, &blocks_at_height_key(h)[..]); + } else { + transaction.put_vec(DATA_COL, &blocks_at_height_key(h), at.encode()); + } + } // due to the fork pruning, this range actually might go too far above where our actual highest block is, // if a relatively short fork is canonicalized. @@ -274,81 +310,20 @@ pub(crate) fn canonicalize( std::cmp::max(range.1, canon_number + 2), ).encode(); - // Because aux-store requires &&[u8], we have to collect. - - let inserted_keys: Vec<_> = std::iter::once((&STORED_BLOCKS_KEY[..], &new_range[..])) - .chain(written_candidates.iter().map(|&(ref k, ref v)| (&k[..], &v[..]))) - .chain(written_at_height.iter().map(|&(ref k, ref v)| (&k[..], &v[..]))) - .collect(); - - let deleted_keys: Vec<_> = deleted_block_keys.iter().map(|k| &k[..]) - .chain(deleted_height_keys.iter().map(|k| &k[..])) - .chain(deleted_candidates.iter().map(|k| &k[..])) - .collect(); + transaction.put_vec(DATA_COL, &STORED_BLOCKS_KEY[..], new_range); // Update the values on-disk. - store.insert_aux( - inserted_keys.iter(), - deleted_keys.iter(), - )?; - - Ok(()) + store.write(transaction).map_err(Into::into) } -/// Clear the aux store of everything. -pub(crate) fn clear(store: &impl AuxStore) - -> sp_blockchain::Result<()> +fn load_decode<D: Decode>(store: &dyn KeyValueDB, key: &[u8]) + -> Result<Option<D>> { - let range = match load_stored_blocks(store)? { - None => return Ok(()), - Some(range) => range, - }; - - let mut visited_height_keys = Vec::new(); - let mut visited_block_keys = Vec::new(); - let mut visited_candidate_keys = Vec::new(); - - for i in range.0..range.1 { - let at_height = load_blocks_at_height(store, i)?; - - visited_height_keys.push(blocks_at_height_key(i)); - - for block_hash in at_height { - let block_entry = match load_block_entry(store, &block_hash)? { - None => continue, - Some(e) => e, - }; - - visited_block_keys.push(block_entry_key(&block_hash)); - - for &(_, candidate_hash) in &block_entry.candidates { - visited_candidate_keys.push(candidate_entry_key(&candidate_hash)); - } - } - } - - // unfortunately demands a `collect` because aux store wants `&&[u8]` for some reason. - let visited_keys_borrowed = visited_height_keys.iter().map(|x| &x[..]) - .chain(visited_block_keys.iter().map(|x| &x[..])) - .chain(visited_candidate_keys.iter().map(|x| &x[..])) - .chain(std::iter::once(&STORED_BLOCKS_KEY[..])) - .collect::<Vec<_>>(); - - store.insert_aux(&[], &visited_keys_borrowed)?; - - Ok(()) -} - -fn load_decode<D: Decode>(store: &impl AuxStore, key: &[u8]) - -> sp_blockchain::Result<Option<D>> -{ - match store.get_aux(key)? { + match store.get(DATA_COL, key)? { None => Ok(None), Some(raw) => D::decode(&mut &raw[..]) .map(Some) - .map_err(|e| sp_blockchain::Error::Storage( - format!("Failed to decode item in approvals DB: {:?}", e) - )), + .map_err(Into::into), } } @@ -372,16 +347,18 @@ pub(crate) struct NewCandidateInfo { /// `None` for any of the candidates referenced by the block entry. In these cases, /// no information about new candidates will be referred to by this function. pub(crate) fn add_block_entry( - store: &impl AuxStore, + store: &dyn KeyValueDB, parent_hash: Hash, number: BlockNumber, entry: BlockEntry, n_validators: usize, candidate_info: impl Fn(&CandidateHash) -> Option<NewCandidateInfo>, -) -> sp_blockchain::Result<Vec<(CandidateHash, CandidateEntry)>> { +) -> Result<Vec<(CandidateHash, CandidateEntry)>> { + let mut transaction = DBTransaction::new(); let session = entry.session; - let new_block_range = { + // Update the stored block range. + { let new_range = match load_stored_blocks(store)? { None => Some(StoredBlockRange(number, number + 1)), Some(range) => if range.1 <= number { @@ -391,10 +368,11 @@ pub(crate) fn add_block_entry( } }; - new_range.map(|n| (STORED_BLOCKS_KEY, n.encode())) + new_range.map(|n| transaction.put_vec(DATA_COL, &STORED_BLOCKS_KEY[..], n.encode())) }; - let updated_blocks_at = { + // Update the blocks at height meta key. + { let mut blocks_at_height = load_blocks_at_height(store, number)?; if blocks_at_height.contains(&entry.block_hash) { // seems we already have a block entry for this block. nothing to do here. @@ -402,13 +380,13 @@ pub(crate) fn add_block_entry( } blocks_at_height.push(entry.block_hash); - (blocks_at_height_key(number), blocks_at_height.encode()) + transaction.put_vec(DATA_COL, &blocks_at_height_key(number)[..], blocks_at_height.encode()) }; let mut candidate_entries = Vec::with_capacity(entry.candidates.len()); - let candidate_entry_updates = { - let mut updated_entries = Vec::with_capacity(entry.candidates.len()); + // read and write all updated entries. + { for &(_, ref candidate_hash) in &entry.candidates { let NewCandidateInfo { candidate, @@ -438,43 +416,26 @@ pub(crate) fn add_block_entry( } ); - updated_entries.push( - (candidate_entry_key(&candidate_hash), candidate_entry.encode()) + transaction.put_vec( + DATA_COL, + &candidate_entry_key(&candidate_hash)[..], + candidate_entry.encode(), ); candidate_entries.push((*candidate_hash, candidate_entry)); } - - updated_entries }; - let updated_parent = { - load_block_entry(store, &parent_hash)?.map(|mut e| { - e.children.push(entry.block_hash); - (block_entry_key(&parent_hash), e.encode()) - }) - }; - - let write_block_entry = (block_entry_key(&entry.block_hash), entry.encode()); - - // write: - // - new block range - // - updated blocks-at item - // - fresh and updated candidate entries - // - the parent block entry. - // - the block entry itself - - // Unfortunately have to collect because aux-store demands &(&[u8], &[u8]). - let all_keys_and_values: Vec<_> = new_block_range.as_ref().into_iter() - .map(|&(ref k, ref v)| (&k[..], &v[..])) - .chain(std::iter::once((&updated_blocks_at.0[..], &updated_blocks_at.1[..]))) - .chain(candidate_entry_updates.iter().map(|&(ref k, ref v)| (&k[..], &v[..]))) - .chain(std::iter::once((&write_block_entry.0[..], &write_block_entry.1[..]))) - .chain(updated_parent.as_ref().into_iter().map(|&(ref k, ref v)| (&k[..], &v[..]))) - .collect(); + // Update the child index for the parent. + load_block_entry(store, &parent_hash)?.map(|mut e| { + e.children.push(entry.block_hash); + transaction.put_vec(DATA_COL, &block_entry_key(&parent_hash)[..], e.encode()) + }); - store.insert_aux(&all_keys_and_values, &[])?; + // Put the new block entry in. + transaction.put_vec(DATA_COL, &block_entry_key(&entry.block_hash)[..], entry.encode()); + store.write(transaction)?; Ok(candidate_entries) } @@ -501,57 +462,55 @@ impl Transaction { } /// Write the contents of the transaction, atomically, to the DB. - pub(crate) fn write(self, db: &impl AuxStore) -> sp_blockchain::Result<()> { + pub(crate) fn write(self, db: &dyn KeyValueDB) -> Result<()> { if self.block_entries.is_empty() && self.candidate_entries.is_empty() { return Ok(()) } - let blocks: Vec<_> = self.block_entries.into_iter().map(|(hash, entry)| { + let mut db_transaction = DBTransaction::new(); + + for (hash, entry) in self.block_entries { let k = block_entry_key(&hash); let v = entry.encode(); - (k, v) - }).collect(); + db_transaction.put_vec(DATA_COL, &k, v); + } - let candidates: Vec<_> = self.candidate_entries.into_iter().map(|(hash, entry)| { + for (hash, entry) in self.candidate_entries { let k = candidate_entry_key(&hash); let v = entry.encode(); - (k, v) - }).collect(); - - let kv = blocks.iter().map(|(k, v)| (&k[..], &v[..])) - .chain(candidates.iter().map(|(k, v)| (&k[..], &v[..]))) - .collect::<Vec<_>>(); + db_transaction.put_vec(DATA_COL, &k, v); + } - db.insert_aux(&kv, &[]) + db.write(db_transaction).map_err(Into::into) } } /// Load the stored-blocks key from the state. -fn load_stored_blocks(store: &impl AuxStore) - -> sp_blockchain::Result<Option<StoredBlockRange>> +fn load_stored_blocks(store: &dyn KeyValueDB) + -> Result<Option<StoredBlockRange>> { load_decode(store, STORED_BLOCKS_KEY) } /// Load a blocks-at-height entry for a given block number. -pub(crate) fn load_blocks_at_height(store: &impl AuxStore, block_number: BlockNumber) - -> sp_blockchain::Result<Vec<Hash>> { +pub(crate) fn load_blocks_at_height(store: &dyn KeyValueDB, block_number: BlockNumber) + -> Result<Vec<Hash>> { load_decode(store, &blocks_at_height_key(block_number)) .map(|x| x.unwrap_or_default()) } /// Load a block entry from the aux store. -pub(crate) fn load_block_entry(store: &impl AuxStore, block_hash: &Hash) - -> sp_blockchain::Result<Option<BlockEntry>> +pub(crate) fn load_block_entry(store: &dyn KeyValueDB, block_hash: &Hash) + -> Result<Option<BlockEntry>> { load_decode(store, &block_entry_key(block_hash)) } /// Load a candidate entry from the aux store. -pub(crate) fn load_candidate_entry(store: &impl AuxStore, candidate_hash: &CandidateHash) - -> sp_blockchain::Result<Option<CandidateEntry>> +pub(crate) fn load_candidate_entry(store: &dyn KeyValueDB, candidate_hash: &CandidateHash) + -> Result<Option<CandidateEntry>> { load_decode(store, &candidate_entry_key(candidate_hash)) } diff --git a/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs b/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs index fff284e115a7bb9dcc863601b520903b8de0f07a..2167ad22810b4800096af9b35534d5926a5927be 100644 --- a/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs +++ b/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs @@ -18,64 +18,37 @@ use super::*; use polkadot_primitives::v1::Id as ParaId; -use std::cell::RefCell; -#[derive(Default)] -pub struct TestStore { - inner: RefCell<HashMap<Vec<u8>, Vec<u8>>>, +pub(crate) fn write_stored_blocks(tx: &mut DBTransaction, range: StoredBlockRange) { + tx.put_vec( + DATA_COL, + &STORED_BLOCKS_KEY[..], + range.encode(), + ); } -impl AuxStore for TestStore { - fn insert_aux<'a, 'b: 'a, 'c: 'a, I, D>(&self, insertions: I, deletions: D) -> sp_blockchain::Result<()> - where I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>, D: IntoIterator<Item = &'a &'b [u8]> - { - let mut store = self.inner.borrow_mut(); - - // insertions before deletions. - for (k, v) in insertions { - store.insert(k.to_vec(), v.to_vec()); - } - - for k in deletions { - store.remove(&k[..]); - } - - Ok(()) - } - - fn get_aux(&self, key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> { - Ok(self.inner.borrow().get(key).map(|v| v.clone())) - } +pub(crate) fn write_blocks_at_height(tx: &mut DBTransaction, height: BlockNumber, blocks: &[Hash]) { + tx.put_vec( + DATA_COL, + &blocks_at_height_key(height)[..], + blocks.encode(), + ); } -impl TestStore { - pub(crate) fn write_stored_blocks(&self, range: StoredBlockRange) { - self.inner.borrow_mut().insert( - STORED_BLOCKS_KEY.to_vec(), - range.encode(), - ); - } - - pub(crate) fn write_blocks_at_height(&self, height: BlockNumber, blocks: &[Hash]) { - self.inner.borrow_mut().insert( - blocks_at_height_key(height).to_vec(), - blocks.encode(), - ); - } - - pub(crate) fn write_block_entry(&self, block_hash: &Hash, entry: &BlockEntry) { - self.inner.borrow_mut().insert( - block_entry_key(block_hash).to_vec(), - entry.encode(), - ); - } +pub(crate) fn write_block_entry(tx: &mut DBTransaction, block_hash: &Hash, entry: &BlockEntry) { + tx.put_vec( + DATA_COL, + &block_entry_key(block_hash)[..], + entry.encode(), + ); +} - pub(crate) fn write_candidate_entry(&self, candidate_hash: &CandidateHash, entry: &CandidateEntry) { - self.inner.borrow_mut().insert( - candidate_entry_key(candidate_hash).to_vec(), - entry.encode(), - ); - } +pub(crate) fn write_candidate_entry(tx: &mut DBTransaction, candidate_hash: &CandidateHash, entry: &CandidateEntry) { + tx.put_vec( + DATA_COL, + &candidate_entry_key(candidate_hash)[..], + entry.encode(), + ); } fn make_bitvec(len: usize) -> BitVec<BitOrderLsb0, u8> { @@ -108,7 +81,7 @@ fn make_candidate(para_id: ParaId, relay_parent: Hash) -> CandidateReceipt { #[test] fn read_write() { - let store = TestStore::default(); + let store = kvdb_memorydb::create(1); let hash_a = Hash::repeat_byte(1); let hash_b = Hash::repeat_byte(2); @@ -137,10 +110,14 @@ fn read_write() { approvals: Default::default(), }; - store.write_stored_blocks(range.clone()); - store.write_blocks_at_height(1, &at_height); - store.write_block_entry(&hash_a, &block_entry); - store.write_candidate_entry(&candidate_hash, &candidate_entry); + let mut tx = DBTransaction::new(); + + write_stored_blocks(&mut tx, range.clone()); + write_blocks_at_height(&mut tx, 1, &at_height); + write_block_entry(&mut tx, &hash_a, &block_entry); + write_candidate_entry(&mut tx, &candidate_hash, &candidate_entry); + + store.write(tx).unwrap(); assert_eq!(load_stored_blocks(&store).unwrap(), Some(range)); assert_eq!(load_blocks_at_height(&store, 1).unwrap(), at_height); @@ -154,8 +131,12 @@ fn read_write() { candidate_entry_key(&candidate_hash).to_vec(), ]; - let delete_keys: Vec<_> = delete_keys.iter().map(|k| &k[..]).collect(); - store.insert_aux(&[], &delete_keys).unwrap(); + let mut tx = DBTransaction::new(); + for key in delete_keys { + tx.delete(DATA_COL, &key[..]); + } + + store.write(tx).unwrap(); assert!(load_stored_blocks(&store).unwrap().is_none()); assert!(load_blocks_at_height(&store, 1).unwrap().is_empty()); @@ -165,7 +146,7 @@ fn read_write() { #[test] fn add_block_entry_works() { - let store = TestStore::default(); + let store = kvdb_memorydb::create(1); let parent_hash = Hash::repeat_byte(1); let block_hash_a = Hash::repeat_byte(2); @@ -230,7 +211,7 @@ fn add_block_entry_works() { #[test] fn add_block_entry_adds_child() { - let store = TestStore::default(); + let store = kvdb_memorydb::create(1); let parent_hash = Hash::repeat_byte(1); let block_hash_a = Hash::repeat_byte(2); @@ -272,59 +253,9 @@ fn add_block_entry_adds_child() { assert_eq!(load_block_entry(&store, &block_hash_b).unwrap(), Some(block_entry_b)); } -#[test] -fn clear_works() { - let store = TestStore::default(); - - let hash_a = Hash::repeat_byte(1); - let hash_b = Hash::repeat_byte(2); - let candidate_hash = CandidateHash(Hash::repeat_byte(3)); - - let range = StoredBlockRange(0, 5); - let at_height = vec![hash_a, hash_b]; - - let block_entry = make_block_entry( - hash_a, - vec![(CoreIndex(0), candidate_hash)], - ); - - let candidate_entry = CandidateEntry { - candidate: Default::default(), - session: 5, - block_assignments: vec![ - (hash_a, ApprovalEntry { - tranches: Vec::new(), - backing_group: GroupIndex(1), - our_assignment: None, - assignments: Default::default(), - approved: false, - }) - ].into_iter().collect(), - approvals: Default::default(), - }; - - store.write_stored_blocks(range.clone()); - store.write_blocks_at_height(1, &at_height); - store.write_block_entry(&hash_a, &block_entry); - store.write_candidate_entry(&candidate_hash, &candidate_entry); - - assert_eq!(load_stored_blocks(&store).unwrap(), Some(range)); - assert_eq!(load_blocks_at_height(&store, 1).unwrap(), at_height); - assert_eq!(load_block_entry(&store, &hash_a).unwrap(), Some(block_entry)); - assert_eq!(load_candidate_entry(&store, &candidate_hash).unwrap(), Some(candidate_entry)); - - clear(&store).unwrap(); - - assert!(load_stored_blocks(&store).unwrap().is_none()); - assert!(load_blocks_at_height(&store, 1).unwrap().is_empty()); - assert!(load_block_entry(&store, &hash_a).unwrap().is_none()); - assert!(load_candidate_entry(&store, &candidate_hash).unwrap().is_none()); -} - - #[test] fn canonicalize_works() { - let store = TestStore::default(); + let store = kvdb_memorydb::create(1); // -> B1 -> C1 -> D1 // A -> B2 -> C2 -> D2 @@ -341,7 +272,9 @@ fn canonicalize_works() { let n_validators = 10; - store.write_stored_blocks(StoredBlockRange(1, 5)); + let mut tx = DBTransaction::new(); + write_stored_blocks(&mut tx, StoredBlockRange(1, 5)); + store.write(tx).unwrap(); let genesis = Hash::repeat_byte(0); diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index 78d7d00d344aadf3e3c7472af1b65e5b8161d233..224d9ca310ef1f231cf9fb6d23d2465569b2c76f 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -42,8 +42,8 @@ use polkadot_node_primitives::approval::{ self as approval_types, BlockApprovalMeta, RelayVRFStory, }; use sc_keystore::LocalKeystore; -use sc_client_api::backend::AuxStore; use sp_consensus_slots::Slot; +use kvdb::KeyValueDB; use futures::prelude::*; use futures::channel::oneshot; @@ -317,7 +317,7 @@ async fn cache_session_info_for_head( session_window.session_info.drain(..outdated); session_window.session_info.extend(s); // we need to account for this case: - // window_start ................................... session_index + // window_start ................................... session_index // old_window_start ........... latest let new_earliest = std::cmp::max(window_start, old_window_start); session_window.earliest_session = Some(new_earliest); @@ -516,7 +516,7 @@ pub struct BlockImportedCandidates { pub(crate) async fn handle_new_head( ctx: &mut impl SubsystemContext, state: &mut State<impl DBReader>, - db_writer: &impl AuxStore, + db_writer: &dyn KeyValueDB, head: Hash, finalized_number: &Option<BlockNumber>, ) -> SubsystemResult<Vec<BlockImportedCandidates>> { @@ -1543,7 +1543,7 @@ mod tests { let session = 5; let irrelevant = 666; let session_info = SessionInfo { - validators: vec![Sr25519Keyring::Alice.public().into(); 6], + validators: vec![Sr25519Keyring::Alice.public().into(); 6], discovery_keys: Vec::new(), assignment_keys: Vec::new(), validator_groups: vec![vec![0; 5], vec![0; 2]], @@ -1610,7 +1610,7 @@ mod tests { }.into(), ); - let db_writer = crate::approval_db::v1::tests::TestStore::default(); + let db_writer = kvdb_memorydb::create(1); let test_fut = { Box::pin(async move { diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 1f4b4f84b74f2f028b252f2da78fd664b4bd01a8..6c1ac204c2d817080a227af48e2812c9e0d9d291 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -44,9 +44,9 @@ use polkadot_node_primitives::approval::{ use parity_scale_codec::Encode; use sc_keystore::LocalKeystore; use sp_consensus_slots::Slot; -use sc_client_api::backend::AuxStore; use sp_runtime::traits::AppVerify; use sp_application_crypto::Pair; +use kvdb::KeyValueDB; use futures::prelude::*; use futures::channel::{mpsc, oneshot}; @@ -74,29 +74,53 @@ mod tests; const APPROVAL_SESSIONS: SessionIndex = 6; const LOG_TARGET: &str = "approval_voting"; +/// Configuration for the approval voting subsystem +pub struct Config { + /// The path where the approval-voting DB should be kept. This directory is completely removed when starting + /// the service. + pub path: std::path::PathBuf, + /// The cache size, in bytes, to spend on approval checking metadata. + pub cache_size: Option<usize>, + /// The slot duration of the consensus algorithm, in milliseconds. Should be evenly + /// divisible by 500. + pub slot_duration_millis: u64, +} + /// The approval voting subsystem. -pub struct ApprovalVotingSubsystem<T> { +pub struct ApprovalVotingSubsystem { keystore: Arc<LocalKeystore>, slot_duration_millis: u64, - db: Arc<T>, + db: Arc<dyn KeyValueDB>, } -impl<T> ApprovalVotingSubsystem<T> { +impl ApprovalVotingSubsystem { /// Create a new approval voting subsystem with the given keystore, slot duration, - /// and underlying DB. - pub fn new(keystore: Arc<LocalKeystore>, slot_duration_millis: u64, db: Arc<T>) -> Self { - ApprovalVotingSubsystem { + /// which creates a DB at the given path. This function will delete the directory + /// at the given path if it already exists. + pub fn with_config( + config: Config, + keystore: Arc<LocalKeystore>, + ) -> std::io::Result<Self> { + const DEFAULT_CACHE_SIZE: usize = 100 * 1024 * 1024; // 100MiB default should be fine unless finality stalls. + + let db = approval_db::v1::clear_and_recreate( + &config.path, + config.cache_size.unwrap_or(DEFAULT_CACHE_SIZE), + )?; + + Ok(ApprovalVotingSubsystem { keystore, - slot_duration_millis, + slot_duration_millis: config.slot_duration_millis, db, - } + }) } } -impl<T, C> Subsystem<C> for ApprovalVotingSubsystem<T> - where T: AuxStore + Send + Sync + 'static, C: SubsystemContext<Message = ApprovalVotingMessage> { +impl<C> Subsystem<C> for ApprovalVotingSubsystem + where C: SubsystemContext<Message = ApprovalVotingMessage> +{ fn start(self, ctx: C) -> SpawnedSubsystem { - let future = run::<T, C>( + let future = run::<C>( ctx, self, Box::new(SystemClock), @@ -204,20 +228,20 @@ trait DBReader { // This is a submodule to enforce opacity of the inner DB type. mod approval_db_v1_reader { use super::{ - DBReader, AuxStore, Hash, CandidateHash, BlockEntry, CandidateEntry, + DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry, Arc, SubsystemResult, SubsystemError, approval_db, }; /// A DB reader that uses the approval-db V1 under the hood. - pub(super) struct ApprovalDBV1Reader<T>(Arc<T>); + pub(super) struct ApprovalDBV1Reader<T: ?Sized>(Arc<T>); - impl<T> From<Arc<T>> for ApprovalDBV1Reader<T> { + impl<T: ?Sized> From<Arc<T>> for ApprovalDBV1Reader<T> { fn from(a: Arc<T>) -> Self { ApprovalDBV1Reader(a) } } - impl<T: AuxStore> DBReader for ApprovalDBV1Reader<T> { + impl DBReader for ApprovalDBV1Reader<dyn KeyValueDB> { fn load_block_entry( &self, block_hash: &Hash, @@ -273,13 +297,13 @@ enum Action { Conclude, } -async fn run<T, C>( +async fn run<C>( mut ctx: C, - subsystem: ApprovalVotingSubsystem<T>, + subsystem: ApprovalVotingSubsystem, clock: Box<dyn Clock + Send + Sync>, assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>, ) -> SubsystemResult<()> - where T: AuxStore + Send + Sync + 'static, C: SubsystemContext<Message = ApprovalVotingMessage> + where C: SubsystemContext<Message = ApprovalVotingMessage> { let (background_tx, background_rx) = mpsc::channel::<BackgroundRequest>(64); let mut state = State { @@ -298,11 +322,6 @@ async fn run<T, C>( let db_writer = &*subsystem.db; - if let Err(e) = approval_db::v1::clear(db_writer) { - tracing::warn!(target: LOG_TARGET, "Failed to clear DB: {:?}", e); - return Err(SubsystemError::with_origin("db", e)); - } - loop { let wait_til_next_tick = match wakeups.first() { None => future::Either::Left(future::pending()), @@ -367,7 +386,7 @@ async fn run<T, C>( async fn handle_actions( ctx: &mut impl SubsystemContext, wakeups: &mut Wakeups, - db: &impl AuxStore, + db: &dyn KeyValueDB, background_tx: &mpsc::Sender<BackgroundRequest>, actions: impl IntoIterator<Item = Action>, ) -> SubsystemResult<bool> { @@ -427,7 +446,7 @@ async fn handle_actions( async fn handle_from_overseer( ctx: &mut impl SubsystemContext, state: &mut State<impl DBReader>, - db_writer: &impl AuxStore, + db_writer: &dyn KeyValueDB, x: FromOverseer<ApprovalVotingMessage>, last_finalized_height: &mut Option<BlockNumber>, ) -> SubsystemResult<Vec<Action>> { diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index 622ed317e585aee7e91f8146332eebb9e06b0e67..47c29284627a1d9f1caa13737d74a29b3325addc 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -107,6 +107,7 @@ default = ["db", "full-node"] db = ["service/db"] full-node = [ "polkadot-node-core-av-store", + "polkadot-node-core-approval-voting", "sc-finality-grandpa-warp-sync" ] @@ -134,7 +135,6 @@ real-overseer = [ "polkadot-pov-distribution", "polkadot-statement-distribution", "polkadot-approval-distribution", - "polkadot-node-core-approval-voting", ] approval-checking = [ diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 1d119014b8fe956a9db9f8b4d264f2e5920dbf5e..db9a7226d94d7c891244fa0bbfd907976fff6cd9 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -29,6 +29,7 @@ use { tracing::info, polkadot_node_core_av_store::Config as AvailabilityConfig, polkadot_node_core_av_store::Error as AvailabilityError, + polkadot_node_core_approval_voting::Config as ApprovalVotingConfig, polkadot_node_core_proposer::ProposerFactory, polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandler}, polkadot_primitives::v1::ParachainHost, @@ -137,6 +138,10 @@ pub enum Error { #[error("Authorities require the real overseer implementation")] AuthoritiesRequireRealOverseer, + + #[cfg(feature = "full-node")] + #[error("Creating a custom database is required for validators")] + DatabasePathRequired, } /// Can be called for a `Configuration` to check if it is a configuration for the `Kusama` network. @@ -358,7 +363,7 @@ fn real_overseer<Spawner, RuntimeClient>( spawner: Spawner, _: IsCollator, _: IsolationStrategy, - _: u64, + _: ApprovalVotingConfig, ) -> Result<(Overseer<Spawner>, OverseerHandler), Error> where RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, @@ -386,7 +391,7 @@ fn real_overseer<Spawner, RuntimeClient>( spawner: Spawner, is_collator: IsCollator, isolation_strategy: IsolationStrategy, - slot_duration: u64, + approval_voting_config: ApprovalVotingConfig, ) -> Result<(Overseer<Spawner>, OverseerHandler), Error> where RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, @@ -417,7 +422,7 @@ where use polkadot_node_core_approval_voting::ApprovalVotingSubsystem; #[cfg(not(feature = "approval-checking"))] - let _ = slot_duration; // silence. + let _ = approval_voting_config; // silence. let all_subsystems = AllSubsystems { availability_distribution: AvailabilityDistributionSubsystem::new( @@ -494,11 +499,10 @@ where Metrics::register(registry)?, ), #[cfg(feature = "approval-checking")] - approval_voting: ApprovalVotingSubsystem::new( + approval_voting: ApprovalVotingSubsystem::with_config( + approval_voting_config, keystore.clone(), - slot_duration, - runtime_client.clone(), - ), + )?, #[cfg(not(feature = "approval-checking"))] approval_voting: polkadot_subsystem::DummySubsystem, }; @@ -656,6 +660,14 @@ pub fn new_full<RuntimeApi, Executor>( let availability_config = config.database.clone().try_into().map_err(Error::Availability)?; + let approval_voting_config = ApprovalVotingConfig { + path: config.database.path() + .ok_or(Error::DatabasePathRequired)? + .join("parachains").join("approval-voting"), + slot_duration_millis: slot_duration, + cache_size: None, // default is fine. + }; + let telemetry_span = TelemetrySpan::new(); let _telemetry_span_entered = telemetry_span.enter(); @@ -749,7 +761,7 @@ pub fn new_full<RuntimeApi, Executor>( spawner, is_collator, isolation_strategy, - slot_duration, + approval_voting_config, )?; let overseer_handler_clone = overseer_handler.clone();