Unverified Commit 9b700da0 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Approval Voting improvements (#2781)



* extract database from av-store itself

* generalize approval-voting over database type

* modes (without handling) and pruning old wakeups

* rework approval importing

* add our_approval_sig to ApprovalEntry

* import assignment

* guide updates for check-full-approval changes

* some aux functions

* send messages when becoming active.

* guide: network bridge sends view updates only when done syncing

* network bridge: send view updates only when done syncing

* tests for new network-bridge behavior

* add a test for updating approval entry with sig

* fix some warnings

* test load-all-blocks

* instantiate new parachains DB

* fix network-bridge empty view updates

* tweak

* fix wasm build, i think

* Update node/core/approval-voting/src/lib.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* add some versioning to parachains_db

* warnings

* fix merge changes

* remove versioning again

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent deab3cb9
Pipeline #132347 failed with stages
in 23 minutes and 40 seconds
......@@ -5650,6 +5650,7 @@ dependencies = [
"assert_matches",
"async-trait",
"futures 0.3.13",
"futures-timer 3.0.2",
"parity-scale-codec",
"parking_lot 0.11.1",
"polkadot-node-network-protocol",
......@@ -5659,6 +5660,7 @@ dependencies = [
"polkadot-primitives",
"sc-authority-discovery",
"sc-network",
"sp-consensus",
"sp-core",
"sp-keyring",
"strum",
......@@ -5692,7 +5694,6 @@ dependencies = [
"futures-timer 3.0.2",
"kvdb",
"kvdb-memorydb",
"kvdb-rocksdb",
"maplit",
"merlin",
"parity-scale-codec",
......@@ -5710,6 +5711,7 @@ dependencies = [
"schnorrkel",
"sp-application-crypto",
"sp-blockchain",
"sp-consensus",
"sp-consensus-babe",
"sp-consensus-slots",
"sp-core",
......@@ -5730,7 +5732,6 @@ dependencies = [
"futures-timer 3.0.2",
"kvdb",
"kvdb-memorydb",
"kvdb-rocksdb",
"log",
"parity-scale-codec",
"parking_lot 0.11.1",
......@@ -5741,7 +5742,6 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-overseer",
"polkadot-primitives",
"sc-service",
"sp-core",
"sp-keyring",
"thiserror",
......@@ -6345,6 +6345,8 @@ dependencies = [
"futures 0.3.13",
"hex-literal",
"kusama-runtime",
"kvdb",
"kvdb-rocksdb",
"pallet-babe",
"pallet-im-online",
"pallet-mmr-primitives",
......
......@@ -13,7 +13,6 @@ bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] }
merlin = "2.0"
schnorrkel = "0.9.1"
kvdb = "0.9.0"
kvdb-rocksdb = "0.11.0"
derive_more = "0.99.1"
polkadot-node-subsystem = { path = "../../subsystem" }
......@@ -25,6 +24,7 @@ polkadot-node-jaeger = { path = "../../jaeger" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
sp-consensus-slots = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false, features = ["full_crypto"] }
......
......@@ -406,6 +406,7 @@ mod tests {
tranches: Vec::new(),
assignments: Default::default(),
our_assignment: None,
our_approval_sig: None,
backing_group: GroupIndex(0),
approved: false,
}.into();
......@@ -452,6 +453,7 @@ mod tests {
],
assignments: bitvec![BitOrderLsb0, u8; 1; 10],
our_assignment: None,
our_approval_sig: None,
backing_group: GroupIndex(0),
approved: false,
}.into();
......@@ -515,6 +517,7 @@ mod tests {
],
assignments: bitvec![BitOrderLsb0, u8; 1; 10],
our_assignment: None,
our_approval_sig: None,
backing_group: GroupIndex(0),
approved: false,
}.into();
......@@ -582,6 +585,7 @@ mod tests {
tranches: Vec::new(),
assignments: bitvec![BitOrderLsb0, u8; 0; 5],
our_assignment: None,
our_approval_sig: None,
backing_group: GroupIndex(0),
approved: false,
}.into();
......@@ -619,6 +623,7 @@ mod tests {
tranches: Vec::new(),
assignments: bitvec![BitOrderLsb0, u8; 0; 10],
our_assignment: None,
our_approval_sig: None,
backing_group: GroupIndex(0),
approved: false,
}.into();
......@@ -657,6 +662,7 @@ mod tests {
tranches: Vec::new(),
assignments: bitvec![BitOrderLsb0, u8; 0; 10],
our_assignment: None,
our_approval_sig: None,
backing_group: GroupIndex(0),
approved: false,
}.into();
......@@ -700,6 +706,7 @@ mod tests {
tranches: Vec::new(),
assignments: bitvec![BitOrderLsb0, u8; 0; n_validators],
our_assignment: None,
our_approval_sig: None,
backing_group: GroupIndex(0),
approved: false,
}.into();
......@@ -765,6 +772,7 @@ mod tests {
tranches: Vec::new(),
assignments: bitvec![BitOrderLsb0, u8; 0; n_validators],
our_assignment: None,
our_approval_sig: None,
backing_group: GroupIndex(0),
approved: false,
}.into();
......@@ -852,6 +860,7 @@ mod tests {
tranches: Vec::new(),
assignments: bitvec![BitOrderLsb0, u8; 0; n_validators],
our_assignment: None,
our_approval_sig: None,
backing_group: GroupIndex(0),
approved: false,
}.into();
......
......@@ -20,14 +20,13 @@ use kvdb::{DBTransaction, KeyValueDB};
use polkadot_node_primitives::approval::{DelayTranche, AssignmentCert};
use polkadot_primitives::v1::{
ValidatorIndex, GroupIndex, CandidateReceipt, SessionIndex, CoreIndex,
BlockNumber, Hash, CandidateHash,
BlockNumber, Hash, CandidateHash, ValidatorSignature,
};
use sp_consensus_slots::Slot;
use parity_scale_codec::{Encode, Decode};
use std::collections::{BTreeMap, HashMap};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0};
#[cfg(test)]
......@@ -40,11 +39,15 @@ pub struct Tick(u64);
pub type Bitfield = BitVec<BitOrderLsb0, u8>;
const NUM_COLUMNS: u32 = 1;
const DATA_COL: u32 = 0;
const STORED_BLOCKS_KEY: &[u8] = b"Approvals_StoredBlocks";
/// The database config.
#[derive(Debug, Clone, Copy)]
pub struct Config {
/// The column family in the database where data is stored.
pub col_data: u32,
}
/// Details pertaining to our assignment on a block.
#[derive(Encode, Decode, Debug, Clone, PartialEq)]
pub struct OurAssignment {
......@@ -71,6 +74,7 @@ pub struct ApprovalEntry {
pub tranches: Vec<TrancheEntry>,
pub backing_group: GroupIndex,
pub our_assignment: Option<OurAssignment>,
pub our_approval_sig: Option<ValidatorSignature>,
// `n_validators` bits.
pub assignments: Bitfield,
pub approved: bool,
......@@ -109,33 +113,6 @@ pub struct BlockEntry {
pub children: Vec<Hash>,
}
/// Clear the given directory and create a RocksDB instance there.
pub fn clear_and_recreate(path: &std::path::Path, cache_size: usize)
-> std::io::Result<Arc<dyn KeyValueDB>>
{
use kvdb_rocksdb::{DatabaseConfig, Database as RocksDB};
tracing::info!("Recreating approval-checking DB at {:?}", path);
if let Err(e) = std::fs::remove_dir_all(path) {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e);
}
}
std::fs::create_dir_all(path)?;
let mut db_config = DatabaseConfig::with_columns(NUM_COLUMNS);
db_config.memory_budget.insert(DATA_COL, cache_size);
let path = path.to_str().ok_or_else(|| std::io::Error::new(
std::io::ErrorKind::Other,
format!("Non-UTF-8 database path {:?}", path),
))?;
Ok(Arc::new(RocksDB::open(&db_config, path)?))
}
/// A range from earliest..last block number stored within the DB.
#[derive(Encode, Decode, Debug, Clone, PartialEq)]
pub struct StoredBlockRange(BlockNumber, BlockNumber);
......@@ -168,12 +145,13 @@ pub type Result<T> = std::result::Result<T, Error>;
/// pruning any competing branches at the same height.
pub(crate) fn canonicalize(
store: &dyn KeyValueDB,
config: &Config,
canon_number: BlockNumber,
canon_hash: Hash,
)
-> Result<()>
{
let range = match load_stored_blocks(store)? {
let range = match load_stored_blocks(store, config)? {
None => return Ok(()),
Some(range) => if range.0 >= canon_number {
return Ok(())
......@@ -197,17 +175,17 @@ pub(crate) fn canonicalize(
transaction: &mut DBTransaction,
visited_candidates: &mut HashMap<CandidateHash, CandidateEntry>,
| -> Result<Vec<Hash>> {
let block_entry = match load_block_entry(store, &block_hash)? {
let block_entry = match load_block_entry(store, config, &block_hash)? {
None => return Ok(Vec::new()),
Some(b) => b,
};
transaction.delete(DATA_COL, &block_entry_key(&block_hash)[..]);
transaction.delete(config.col_data, &block_entry_key(&block_hash)[..]);
for &(_, ref candidate_hash) in &block_entry.candidates {
let candidate = match visited_candidates.entry(*candidate_hash) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => {
e.insert(match load_candidate_entry(store, candidate_hash)? {
e.insert(match load_candidate_entry(store, config, candidate_hash)? {
None => continue, // Should not happen except for corrupt DB
Some(c) => c,
})
......@@ -222,8 +200,8 @@ pub(crate) fn canonicalize(
// First visit everything before the height.
for i in range.0..canon_number {
let at_height = load_blocks_at_height(store, i)?;
transaction.delete(DATA_COL, &blocks_at_height_key(i)[..]);
let at_height = load_blocks_at_height(store, config, i)?;
transaction.delete(config.col_data, &blocks_at_height_key(i)[..]);
for b in at_height {
let _ = visit_and_remove_block_entry(
......@@ -236,8 +214,8 @@ pub(crate) fn canonicalize(
// Then visit everything at the height.
let pruned_branches = {
let at_height = load_blocks_at_height(store, canon_number)?;
transaction.delete(DATA_COL, &blocks_at_height_key(canon_number));
let at_height = load_blocks_at_height(store, config, canon_number)?;
transaction.delete(config.col_data, &blocks_at_height_key(canon_number));
// Note that while there may be branches descending from blocks at earlier heights,
// we have already covered them by removing everything at earlier heights.
......@@ -274,7 +252,7 @@ pub(crate) fn canonicalize(
// visit the at-height key for this deleted block's height.
let at_height = match visited_heights.entry(height) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => e.insert(load_blocks_at_height(store, height)?),
Entry::Vacant(e) => e.insert(load_blocks_at_height(store, config, height)?),
};
if let Some(i) = at_height.iter().position(|x| x == &next_child) {
......@@ -286,10 +264,10 @@ pub(crate) fn canonicalize(
// Update all `CandidateEntry`s, deleting all those which now have empty `block_assignments`.
for (candidate_hash, candidate) in visited_candidates {
if candidate.block_assignments.is_empty() {
transaction.delete(DATA_COL, &candidate_entry_key(&candidate_hash)[..]);
transaction.delete(config.col_data, &candidate_entry_key(&candidate_hash)[..]);
} else {
transaction.put_vec(
DATA_COL,
config.col_data,
&candidate_entry_key(&candidate_hash)[..],
candidate.encode(),
);
......@@ -299,9 +277,9 @@ pub(crate) fn canonicalize(
// Update all blocks-at-height keys, deleting all those which now have empty `block_assignments`.
for (h, at) in visited_heights {
if at.is_empty() {
transaction.delete(DATA_COL, &blocks_at_height_key(h)[..]);
transaction.delete(config.col_data, &blocks_at_height_key(h)[..]);
} else {
transaction.put_vec(DATA_COL, &blocks_at_height_key(h), at.encode());
transaction.put_vec(config.col_data, &blocks_at_height_key(h), at.encode());
}
}
......@@ -312,16 +290,16 @@ pub(crate) fn canonicalize(
std::cmp::max(range.1, canon_number + 2),
).encode();
transaction.put_vec(DATA_COL, &STORED_BLOCKS_KEY[..], new_range);
transaction.put_vec(config.col_data, &STORED_BLOCKS_KEY[..], new_range);
// Update the values on-disk.
store.write(transaction).map_err(Into::into)
}
fn load_decode<D: Decode>(store: &dyn KeyValueDB, key: &[u8])
fn load_decode<D: Decode>(store: &dyn KeyValueDB, col_data: u32, key: &[u8])
-> Result<Option<D>>
{
match store.get(DATA_COL, key)? {
match store.get(col_data, key)? {
None => Ok(None),
Some(raw) => D::decode(&mut &raw[..])
.map(Some)
......@@ -350,6 +328,7 @@ pub(crate) struct NewCandidateInfo {
/// no information about new candidates will be referred to by this function.
pub(crate) fn add_block_entry(
store: &dyn KeyValueDB,
config: &Config,
entry: BlockEntry,
n_validators: usize,
candidate_info: impl Fn(&CandidateHash) -> Option<NewCandidateInfo>,
......@@ -361,7 +340,7 @@ pub(crate) fn add_block_entry(
// Update the stored block range.
{
let new_range = match load_stored_blocks(store)? {
let new_range = match load_stored_blocks(store, config)? {
None => Some(StoredBlockRange(number, number + 1)),
Some(range) => if range.1 <= number {
Some(StoredBlockRange(range.0, number + 1))
......@@ -370,19 +349,19 @@ pub(crate) fn add_block_entry(
}
};
new_range.map(|n| transaction.put_vec(DATA_COL, &STORED_BLOCKS_KEY[..], n.encode()))
new_range.map(|n| transaction.put_vec(config.col_data, &STORED_BLOCKS_KEY[..], n.encode()))
};
// Update the blocks at height meta key.
{
let mut blocks_at_height = load_blocks_at_height(store, number)?;
let mut blocks_at_height = load_blocks_at_height(store, config, number)?;
if blocks_at_height.contains(&entry.block_hash) {
// seems we already have a block entry for this block. nothing to do here.
return Ok(Vec::new())
}
blocks_at_height.push(entry.block_hash);
transaction.put_vec(DATA_COL, &blocks_at_height_key(number)[..], blocks_at_height.encode())
transaction.put_vec(config.col_data, &blocks_at_height_key(number)[..], blocks_at_height.encode())
};
let mut candidate_entries = Vec::with_capacity(entry.candidates.len());
......@@ -399,7 +378,7 @@ pub(crate) fn add_block_entry(
Some(info) => info,
};
let mut candidate_entry = load_candidate_entry(store, &candidate_hash)?
let mut candidate_entry = load_candidate_entry(store, config, &candidate_hash)?
.unwrap_or_else(move || CandidateEntry {
candidate,
session,
......@@ -413,13 +392,14 @@ pub(crate) fn add_block_entry(
tranches: Vec::new(),
backing_group,
our_assignment,
our_approval_sig: None,
assignments: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators],
approved: false,
}
);
transaction.put_vec(
DATA_COL,
config.col_data,
&candidate_entry_key(&candidate_hash)[..],
candidate_entry.encode(),
);
......@@ -429,13 +409,13 @@ pub(crate) fn add_block_entry(
};
// Update the child index for the parent.
load_block_entry(store, &parent_hash)?.map(|mut e| {
load_block_entry(store, config, &parent_hash)?.map(|mut e| {
e.children.push(entry.block_hash);
transaction.put_vec(DATA_COL, &block_entry_key(&parent_hash)[..], e.encode())
transaction.put_vec(config.col_data, &block_entry_key(&parent_hash)[..], e.encode())
});
// Put the new block entry in.
transaction.put_vec(DATA_COL, &block_entry_key(&entry.block_hash)[..], entry.encode());
transaction.put_vec(config.col_data, &block_entry_key(&entry.block_hash)[..], entry.encode());
store.write(transaction)?;
Ok(candidate_entries)
......@@ -445,6 +425,7 @@ pub(crate) fn add_block_entry(
/// chain.
pub fn force_approve(
store: &dyn KeyValueDB,
db_config: Config,
chain_head: Hash,
up_to: BlockNumber,
) -> Result<()> {
......@@ -456,11 +437,11 @@ pub fn force_approve(
let mut cur_hash = chain_head;
let mut state = State::WalkTo;
let mut tx = Transaction::default();
let mut tx = Transaction::new(db_config);
// iterate back to the `up_to` block, and then iterate backwards until all blocks
// are updated.
while let Some(mut entry) = load_block_entry(store, &cur_hash)? {
while let Some(mut entry) = load_block_entry(store, &db_config, &cur_hash)? {
if entry.block_number <= up_to {
state = State::Approving;
......@@ -480,15 +461,35 @@ pub fn force_approve(
tx.write(store)
}
/// Return all blocks which have entries in the DB, ascending, by height.
pub(crate) fn load_all_blocks(store: &dyn KeyValueDB, config: &Config) -> Result<Vec<Hash>> {
let stored_blocks = load_stored_blocks(store, config)?;
let mut hashes = Vec::new();
for height in stored_blocks.into_iter().flat_map(|s| s.0..s.1) {
hashes.extend(load_blocks_at_height(store, config, height)?);
}
Ok(hashes)
}
// An atomic transaction of multiple candidate or block entries.
#[derive(Default)]
#[must_use = "Transactions do nothing unless written to a DB"]
pub struct Transaction {
block_entries: HashMap<Hash, BlockEntry>,
candidate_entries: HashMap<CandidateHash, CandidateEntry>,
config: Config,
}
impl Transaction {
pub(crate) fn new(config: Config) -> Self {
Transaction {
block_entries: HashMap::default(),
candidate_entries: HashMap::default(),
config,
}
}
/// Put a block entry in the transaction, overwriting any other with the
/// same hash.
pub(crate) fn put_block_entry(&mut self, entry: BlockEntry) {
......@@ -519,14 +520,14 @@ impl Transaction {
let k = block_entry_key(&hash);
let v = entry.encode();
db_transaction.put_vec(DATA_COL, &k, v);
db_transaction.put_vec(self.config.col_data, &k, v);
}
for (hash, entry) in self.candidate_entries {
let k = candidate_entry_key(&hash);
let v = entry.encode();
db_transaction.put_vec(DATA_COL, &k, v);
db_transaction.put_vec(self.config.col_data, &k, v);
}
db.write(db_transaction).map_err(Into::into)
......@@ -534,31 +535,39 @@ impl Transaction {
}
/// Load the stored-blocks key from the state.
fn load_stored_blocks(store: &dyn KeyValueDB)
fn load_stored_blocks(store: &dyn KeyValueDB, config: &Config)
-> Result<Option<StoredBlockRange>>
{
load_decode(store, STORED_BLOCKS_KEY)
load_decode(store, config.col_data, STORED_BLOCKS_KEY)
}
/// Load a blocks-at-height entry for a given block number.
pub(crate) fn load_blocks_at_height(store: &dyn KeyValueDB, block_number: BlockNumber)
pub(crate) fn load_blocks_at_height(
store: &dyn KeyValueDB,
config: &Config,
block_number: BlockNumber,
)
-> Result<Vec<Hash>> {
load_decode(store, &blocks_at_height_key(block_number))
load_decode(store, config.col_data, &blocks_at_height_key(block_number))
.map(|x| x.unwrap_or_default())
}
/// Load a block entry from the aux store.
pub(crate) fn load_block_entry(store: &dyn KeyValueDB, block_hash: &Hash)
pub(crate) fn load_block_entry(store: &dyn KeyValueDB, config: &Config, block_hash: &Hash)
-> Result<Option<BlockEntry>>
{
load_decode(store, &block_entry_key(block_hash))
load_decode(store, config.col_data, &block_entry_key(block_hash))
}
/// Load a candidate entry from the aux store.
pub(crate) fn load_candidate_entry(store: &dyn KeyValueDB, candidate_hash: &CandidateHash)
pub(crate) fn load_candidate_entry(
store: &dyn KeyValueDB,
config: &Config,
candidate_hash: &CandidateHash,
)
-> Result<Option<CandidateEntry>>
{
load_decode(store, &candidate_entry_key(candidate_hash))
load_decode(store, config.col_data, &candidate_entry_key(candidate_hash))
}
/// The key a given block entry is stored under.
......
......@@ -19,6 +19,13 @@
use super::*;
use polkadot_primitives::v1::Id as ParaId;
const DATA_COL: u32 = 0;
const NUM_COLUMNS: u32 = 1;
const TEST_CONFIG: Config = Config {
col_data: DATA_COL,
};
pub(crate) fn write_stored_blocks(tx: &mut DBTransaction, range: StoredBlockRange) {
tx.put_vec(
DATA_COL,
......@@ -85,7 +92,7 @@ fn make_candidate(para_id: ParaId, relay_parent: Hash) -> CandidateReceipt {
#[test]
fn read_write() {
let store = kvdb_memorydb::create(1);
let store = kvdb_memorydb::create(NUM_COLUMNS);
let hash_a = Hash::repeat_byte(1);
let hash_b = Hash::repeat_byte(2);
......@@ -109,6 +116,7 @@ fn read_write() {
tranches: Vec::new(),
backing_group: GroupIndex(1),
our_assignment: None,
our_approval_sig: None,
assignments: Default::default(),
approved: false,
})
......@@ -125,10 +133,13 @@ fn read_write() {
store.write(tx).unwrap();
assert_eq!(load_stored_blocks(&store).unwrap(), Some(range));
assert_eq!(load_blocks_at_height(&store, 1).unwrap(), at_height);
assert_eq!(load_block_entry(&store, &hash_a).unwrap(), Some(block_entry));
assert_eq!(load_candidate_entry(&store, &candidate_hash).unwrap(), Some(candidate_entry));
assert_eq!(load_stored_blocks(&store, &TEST_CONFIG).unwrap(), Some(range));
assert_eq!(load_blocks_at_height(&store, &TEST_CONFIG, 1).unwrap(), at_height);
assert_eq!(load_block_entry(&store, &TEST_CONFIG, &hash_a).unwrap(), Some(block_entry));
assert_eq!(
load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash).unwrap(),
Some(candidate_entry),
);
let delete_keys = vec![
STORED_BLOCKS_KEY.to_vec(),
......@@ -144,15 +155,15 @@ fn read_write() {
store.write(tx).unwrap();
assert!(load_stored_blocks(&store).unwrap().is_none());
assert!(load_blocks_at_height(&store, 1).unwrap().is_empty());
assert!(load_block_entry(&store, &hash_a).unwrap().is_none());
assert!(load_candidate_entry(&store, &candidate_hash).unwrap().is_none());
assert!(load_stored_blocks(&store, &TEST_CONFIG).unwrap().is_none());
assert!(load_blocks_at_height(&store, &TEST_CONFIG, 1).unwrap().is_empty());
assert!(load_block_entry(&store, &TEST_CONFIG, &hash_a).unwrap().is_none());
assert!(load_candidate_entry(&store, &TEST_CONFIG, &candidate_hash).unwrap().is_none());
}
#[test]
fn add_block_entry_works() {
let store = kvdb_memorydb::create(1);
let store = kvdb_memorydb::create(NUM_COLUMNS);
let parent_hash = Hash::repeat_byte(1);
let block_hash_a = Hash::repeat_byte(2);
......@@ -188,6 +199,7 @@ fn add_block_entry_works() {
add_block_entry(