Unverified Commit 6cc70a8a authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Chain Selection: Follow-ups (#3328)



* DB skeleton

* key formats

* lexicographic test

* custom types for DB

* implement backend for db-v1

* remove VoidBackend and integrate with real DbBackend

* detect stagnant blocks on in interval

* fix tests

* add tests for stagnant

* send ChainSelectionMessage::Approved

* tests for DB backend

* unused import

* upgrade kvdb-memorydb
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent ee67bdce
Pipeline #145861 passed with stages
in 39 minutes and 4 seconds
......@@ -6176,7 +6176,9 @@ version = "0.1.0"
dependencies = [
"assert_matches",
"futures 0.3.15",
"futures-timer 3.0.2",
"kvdb",
"kvdb-memorydb",
"parity-scale-codec",
"parking_lot 0.11.1",
"polkadot-node-primitives",
......
......@@ -423,17 +423,21 @@ pub(crate) fn add_block_entry(
/// Forcibly approve all candidates included at up to the given relay-chain height in the indicated
/// chain.
///
/// Returns a list of block hashes that were not approved and are now.
pub fn force_approve(
store: &dyn KeyValueDB,
db_config: Config,
chain_head: Hash,
up_to: BlockNumber,
) -> Result<()> {
) -> Result<Vec<Hash>> {
enum State {
WalkTo,
Approving,
}
let mut approved_hashes = Vec::new();
let mut cur_hash = chain_head;
let mut state = State::WalkTo;
......@@ -452,13 +456,20 @@ pub fn force_approve(
match state {
State::WalkTo => {},
State::Approving => {
let is_approved = entry.approved_bitfield.count_ones()
== entry.approved_bitfield.len();
if !is_approved {
entry.approved_bitfield.iter_mut().for_each(|mut b| *b = true);
approved_hashes.push(entry.block_hash);
tx.put_block_entry(entry);
}
}
}
}
tx.write(store)
tx.write(store)?;
Ok(approved_hashes)
}
/// Return all blocks which have entries in the DB, ascending, by height.
......
......@@ -534,7 +534,7 @@ fn force_approve_works() {
).unwrap();
}
force_approve(&store, TEST_CONFIG, block_hash_d, 2).unwrap();
let approved_hashes = force_approve(&store, TEST_CONFIG, block_hash_d, 2).unwrap();
assert!(load_block_entry(
&store,
......@@ -556,6 +556,10 @@ fn force_approve_works() {
&TEST_CONFIG,
&block_hash_d,
).unwrap().unwrap().approved_bitfield.not_any());
assert_eq!(
approved_hashes,
vec![block_hash_b, block_hash_a],
);
}
#[test]
......
......@@ -31,6 +31,7 @@
use polkadot_node_subsystem::{
messages::{
RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage, ApprovalDistributionMessage,
ChainSelectionMessage,
},
SubsystemContext, SubsystemError, SubsystemResult,
};
......@@ -462,10 +463,16 @@ pub(crate) async fn handle_new_head(
result.len(),
);
}
result
}
};
// If all bits are already set, then send an approve message.
if approved_bitfield.count_ones() == approved_bitfield.len() {
ctx.send_message(ChainSelectionMessage::Approved(block_hash).into()).await;
}
let block_entry = approval_db::v1::BlockEntry {
block_hash,
parent_hash: block_header.parent_hash,
......@@ -487,8 +494,18 @@ pub(crate) async fn handle_new_head(
"Enacting force-approve",
);
approval_db::v1::force_approve(db_writer, db_config, block_hash, up_to)
let approved_hashes = approval_db::v1::force_approve(
db_writer,
db_config,
block_hash,
up_to,
)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))?;
// Notify chain-selection of all approved hashes.
for hash in approved_hashes {
ctx.send_message(ChainSelectionMessage::Approved(hash).into()).await;
}
}
tracing::trace!(
......
......@@ -26,7 +26,7 @@ use polkadot_node_subsystem::{
AssignmentCheckError, AssignmentCheckResult, ApprovalCheckError, ApprovalCheckResult,
ApprovalVotingMessage, RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage,
ApprovalDistributionMessage, CandidateValidationMessage,
AvailabilityRecoveryMessage,
AvailabilityRecoveryMessage, ChainSelectionMessage,
},
errors::RecoveryError,
Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
......@@ -717,6 +717,7 @@ enum Action {
candidate: CandidateReceipt,
backing_group: GroupIndex,
},
NoteApprovedInChainSelection(Hash),
IssueApproval(CandidateHash, ApprovalVoteRequest),
BecomeActive,
Conclude,
......@@ -962,6 +963,9 @@ async fn handle_actions(
Some(_) => {},
}
}
Action::NoteApprovedInChainSelection(block_hash) => {
ctx.send_message(ChainSelectionMessage::Approved(block_hash).into()).await;
}
Action::BecomeActive => {
*mode = Mode::Active;
......@@ -1805,6 +1809,7 @@ fn import_checked_approval(
if is_block_approved && !was_block_approved {
metrics.on_block_approved(status.tranche_now as _);
actions.push(Action::NoteApprovedInChainSelection(block_hash));
}
actions.push(Action::WriteBlockEntry(block_entry));
......
......@@ -850,6 +850,14 @@ fn import_checked_approval_updates_entries_and_schedules() {
assert_matches!(
actions.get(0).unwrap(),
Action::NoteApprovedInChainSelection(h) => {
assert_eq!(h, &block_hash);
}
);
assert_matches!(
actions.get(1).unwrap(),
Action::WriteBlockEntry(b_entry) => {
assert_eq!(b_entry.block_hash(), block_hash);
assert!(b_entry.is_fully_approved());
......@@ -857,7 +865,7 @@ fn import_checked_approval_updates_entries_and_schedules() {
}
);
assert_matches!(
actions.get_mut(1).unwrap(),
actions.get_mut(2).unwrap(),
Action::WriteCandidateEntry(c_hash, ref mut c_entry) => {
assert_eq!(c_hash, &candidate_hash);
assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved());
......@@ -1391,9 +1399,16 @@ fn import_checked_approval_sets_one_block_bit_at_a_time() {
ApprovalSource::Remote(validator_index_b),
);
assert_eq!(actions.len(), 2);
assert_eq!(actions.len(), 3);
assert_matches!(
actions.get(0).unwrap(),
Action::NoteApprovedInChainSelection(h) => {
assert_eq!(h, &block_hash);
}
);
assert_matches!(
actions.get(1).unwrap(),
Action::WriteBlockEntry(b_entry) => {
assert_eq!(b_entry.block_hash(), block_hash);
assert!(b_entry.is_fully_approved());
......@@ -1403,7 +1418,7 @@ fn import_checked_approval_sets_one_block_bit_at_a_time() {
);
assert_matches!(
actions.get(1).unwrap(),
actions.get(2).unwrap(),
Action::WriteCandidateEntry(c_h, c_entry) => {
assert_eq!(c_h, &candidate_hash_2);
assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved());
......
......@@ -7,6 +7,7 @@ edition = "2018"
[dependencies]
futures = "0.3.15"
futures-timer = "3"
tracing = "0.1.26"
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-primitives = { path = "../../primitives" }
......@@ -21,3 +22,4 @@ polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
parking_lot = "0.11"
assert_matches = "1"
kvdb-memorydb = "0.10.0"
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A database [`Backend`][crate::backend::Backend] for the chain selection subsystem.
pub(super) mod v1;
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A database [`Backend`][crate::backend::Backend] for the chain selection subsystem.
//!
//! This stores the following schema:
//!
//! ```ignore
//! ("CS_block_entry", Hash) -> BlockEntry;
//! ("CS_block_height", BigEndianBlockNumber) -> Vec<Hash>;
//! ("CS_stagnant_at", BigEndianTimestamp) -> Vec<Hash>;
//! ("CS_leaves") -> LeafEntrySet;
//! ```
//!
//! The big-endian encoding is used for creating iterators over the key-value DB which are
//! accessible by prefix, to find the earlist block number stored as well as the all stagnant
//! blocks.
//!
//! The `Vec`s stored are always non-empty. Empty `Vec`s are not stored on disk so there is no
//! semantic difference between `None` and an empty `Vec`.
use crate::backend::{Backend, BackendWriteOp};
use crate::Error;
use polkadot_primitives::v1::{BlockNumber, Hash};
use polkadot_node_primitives::BlockWeight;
use kvdb::{DBTransaction, KeyValueDB};
use parity_scale_codec::{Encode, Decode};
use std::sync::Arc;
const BLOCK_ENTRY_PREFIX: &[u8; 14] = b"CS_block_entry";
const BLOCK_HEIGHT_PREFIX: &[u8; 15] = b"CS_block_height";
const STAGNANT_AT_PREFIX: &[u8; 14] = b"CS_stagnant_at";
const LEAVES_KEY: &[u8; 9] = b"CS_leaves";
type Timestamp = u64;
#[derive(Debug, Encode, Decode, Clone, PartialEq)]
enum Approval {
#[codec(index = 0)]
Approved,
#[codec(index = 1)]
Unapproved,
#[codec(index = 2)]
Stagnant,
}
impl From<crate::Approval> for Approval {
fn from(x: crate::Approval) -> Self {
match x {
crate::Approval::Approved => Approval::Approved,
crate::Approval::Unapproved => Approval::Unapproved,
crate::Approval::Stagnant => Approval::Stagnant,
}
}
}
impl From<Approval> for crate::Approval {
fn from(x: Approval) -> crate::Approval {
match x {
Approval::Approved => crate::Approval::Approved,
Approval::Unapproved => crate::Approval::Unapproved,
Approval::Stagnant => crate::Approval::Stagnant,
}
}
}
#[derive(Debug, Encode, Decode, Clone, PartialEq)]
struct ViabilityCriteria {
explicitly_reverted: bool,
approval: Approval,
earliest_unviable_ancestor: Option<Hash>,
}
impl From<crate::ViabilityCriteria> for ViabilityCriteria {
fn from(x: crate::ViabilityCriteria) -> Self {
ViabilityCriteria {
explicitly_reverted: x.explicitly_reverted,
approval: x.approval.into(),
earliest_unviable_ancestor: x.earliest_unviable_ancestor,
}
}
}
impl From<ViabilityCriteria> for crate::ViabilityCriteria {
fn from(x: ViabilityCriteria) -> crate::ViabilityCriteria {
crate::ViabilityCriteria {
explicitly_reverted: x.explicitly_reverted,
approval: x.approval.into(),
earliest_unviable_ancestor: x.earliest_unviable_ancestor,
}
}
}
#[derive(Encode, Decode)]
struct LeafEntry {
weight: BlockWeight,
block_number: BlockNumber,
block_hash: Hash,
}
impl From<crate::LeafEntry> for LeafEntry {
fn from(x: crate::LeafEntry) -> Self {
LeafEntry {
weight: x.weight,
block_number: x.block_number,
block_hash: x.block_hash,
}
}
}
impl From<LeafEntry> for crate::LeafEntry {
fn from(x: LeafEntry) -> crate::LeafEntry {
crate::LeafEntry {
weight: x.weight,
block_number: x.block_number,
block_hash: x.block_hash,
}
}
}
#[derive(Encode, Decode)]
struct LeafEntrySet {
inner: Vec<LeafEntry>,
}
impl From<crate::LeafEntrySet> for LeafEntrySet {
fn from(x: crate::LeafEntrySet) -> Self {
LeafEntrySet {
inner: x.inner.into_iter().map(Into::into).collect(),
}
}
}
impl From<LeafEntrySet> for crate::LeafEntrySet {
fn from(x: LeafEntrySet) -> crate::LeafEntrySet {
crate::LeafEntrySet {
inner: x.inner.into_iter().map(Into::into).collect(),
}
}
}
#[derive(Debug, Encode, Decode, Clone, PartialEq)]
struct BlockEntry {
block_hash: Hash,
block_number: BlockNumber,
parent_hash: Hash,
children: Vec<Hash>,
viability: ViabilityCriteria,
weight: BlockWeight,
}
impl From<crate::BlockEntry> for BlockEntry {
fn from(x: crate::BlockEntry) -> Self {
BlockEntry {
block_hash: x.block_hash,
block_number: x.block_number,
parent_hash: x.parent_hash,
children: x.children,
viability: x.viability.into(),
weight: x.weight,
}
}
}
impl From<BlockEntry> for crate::BlockEntry {
fn from(x: BlockEntry) -> crate::BlockEntry {
crate::BlockEntry {
block_hash: x.block_hash,
block_number: x.block_number,
parent_hash: x.parent_hash,
children: x.children,
viability: x.viability.into(),
weight: x.weight,
}
}
}
/// Configuration for the database backend.
#[derive(Debug, Clone, Copy)]
pub struct Config {
/// The column where block metadata is stored.
pub col_data: u32,
}
/// The database backend.
pub struct DbBackend {
inner: Arc<dyn KeyValueDB>,
config: Config,
}
impl DbBackend {
/// Create a new [`DbBackend`] with the supplied key-value store and
/// config.
pub fn new(db: Arc<dyn KeyValueDB>, config: Config) -> Self {
DbBackend {
inner: db,
config,
}
}
}
impl Backend for DbBackend {
fn load_block_entry(&self, hash: &Hash) -> Result<Option<crate::BlockEntry>, Error> {
load_decode::<BlockEntry>(
&*self.inner,
self.config.col_data,
&block_entry_key(hash),
).map(|o| o.map(Into::into))
}
fn load_leaves(&self) -> Result<crate::LeafEntrySet, Error> {
load_decode::<LeafEntrySet>(
&*self.inner,
self.config.col_data,
LEAVES_KEY,
).map(|o| o.map(Into::into).unwrap_or_default())
}
fn load_stagnant_at(&self, timestamp: crate::Timestamp) -> Result<Vec<Hash>, Error> {
load_decode::<Vec<Hash>>(
&*self.inner,
self.config.col_data,
&stagnant_at_key(timestamp.into()),
).map(|o| o.unwrap_or_default())
}
fn load_stagnant_at_up_to(&self, up_to: crate::Timestamp)
-> Result<Vec<(crate::Timestamp, Vec<Hash>)>, Error>
{
let stagnant_at_iter = self.inner.iter_with_prefix(
self.config.col_data,
&STAGNANT_AT_PREFIX[..],
);
let val = stagnant_at_iter
.filter_map(|(k, v)| {
match (decode_stagnant_at_key(&mut &k[..]), <Vec<_>>::decode(&mut &v[..]).ok()) {
(Some(at), Some(stagnant_at)) => Some((at, stagnant_at)),
_ => None,
}
})
.take_while(|(at, _)| *at <= up_to.into())
.collect::<Vec<_>>();
Ok(val)
}
fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error> {
let blocks_at_height_iter = self.inner.iter_with_prefix(
self.config.col_data,
&BLOCK_HEIGHT_PREFIX[..],
);
let val = blocks_at_height_iter
.filter_map(|(k, _)| decode_block_height_key(&k[..]))
.next();
Ok(val)
}
fn load_blocks_by_number(&self, number: BlockNumber) -> Result<Vec<Hash>, Error> {
load_decode::<Vec<Hash>>(
&*self.inner,
self.config.col_data,
&block_height_key(number),
).map(|o| o.unwrap_or_default())
}
/// Atomically write the list of operations, with later operations taking precedence over prior.
fn write<I>(&mut self, ops: I) -> Result<(), Error>
where I: IntoIterator<Item = BackendWriteOp>
{
let mut tx = DBTransaction::new();
for op in ops {
match op {
BackendWriteOp::WriteBlockEntry(block_entry) => {
let block_entry: BlockEntry = block_entry.into();
tx.put_vec(
self.config.col_data,
&block_entry_key(&block_entry.block_hash),
block_entry.encode(),
);
}
BackendWriteOp::WriteBlocksByNumber(block_number, v) => {
if v.is_empty() {
tx.delete(
self.config.col_data,
&block_height_key(block_number),
);
} else {
tx.put_vec(
self.config.col_data,
&block_height_key(block_number),
v.encode(),
);
}
}
BackendWriteOp::WriteViableLeaves(leaves) => {
let leaves: LeafEntrySet = leaves.into();
if leaves.inner.is_empty() {
tx.delete(
self.config.col_data,
&LEAVES_KEY[..],
);
} else {
tx.put_vec(
self.config.col_data,
&LEAVES_KEY[..],
leaves.encode(),
);
}
}
BackendWriteOp::WriteStagnantAt(timestamp, stagnant_at) => {
let timestamp: Timestamp = timestamp.into();
if stagnant_at.is_empty() {
tx.delete(
self.config.col_data,
&stagnant_at_key(timestamp),
);
} else {
tx.put_vec(
self.config.col_data,
&stagnant_at_key(timestamp),
stagnant_at.encode(),
);
}
}
BackendWriteOp::DeleteBlocksByNumber(block_number) => {
tx.delete(
self.config.col_data,
&block_height_key(block_number),
);
}
BackendWriteOp::DeleteBlockEntry(hash) => {
tx.delete(
self.config.col_data,
&block_entry_key(&hash),
);
}
BackendWriteOp::DeleteStagnantAt(timestamp) => {
let timestamp: Timestamp = timestamp.into();
tx.delete(
self.config.col_data,
&stagnant_at_key(timestamp),
);
}
}
}
<