Unverified Commit 078e1605 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

alternate availability store schema (#2237)



* alternate availability store schema

* improvements

* tweaks

* new DB schema and skeleton

* expand skeleton and tweaks

* handle backing and inclusion

* let finality be handled later

* handle finalized blocks

* implement query methods

* implement chunk storing

* StoreAvailableData

* fix an off-by-one

* implement pruning

* reinstate subsystem trait impl

* reinstate metrics

* fix warnings

* remove chunks_cache

* oops

* actually store the available data

* mockable pruning interval

* fix tests

* spacing

* fix code grumbles

* guide improvements

* make time mockable

* implement a mocked clock for testing

* return DB errors

* Update node/core/av-store/Cargo.toml

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Update roadmap/implementers-guide/src/node/utility/availability-store.md

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Update roadmap/implementers-guide/src/node/utility/availability-store.md

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* review grumbles & clarity

* fix review grumbles

* Add docs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent ac5e2a08
Pipeline #119694 passed with stages
in 28 minutes and 58 seconds
......@@ -5044,6 +5044,7 @@ name = "polkadot-node-core-av-store"
version = "0.1.0"
dependencies = [
"assert_matches",
"bitvec",
"env_logger 0.8.2",
"futures 0.3.8",
"futures-timer 3.0.2",
......@@ -5052,6 +5053,7 @@ dependencies = [
"kvdb-rocksdb",
"log",
"parity-scale-codec",
"parking_lot 0.11.1",
"polkadot-erasure-coding",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
......@@ -5060,6 +5062,7 @@ dependencies = [
"polkadot-primitives",
"sc-service",
"sp-core",
"sp-keyring",
"thiserror",
"tracing",
"tracing-futures",
......
......@@ -12,6 +12,7 @@ kvdb-rocksdb = "0.10.0"
thiserror = "1.0.23"
tracing = "0.1.22"
tracing-futures = "0.2.4"
bitvec = "0.17.4"
parity-scale-codec = { version = "1.3.5", features = ["derive"] }
erasure = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" }
......@@ -31,3 +32,5 @@ kvdb-memorydb = "0.8.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
parking_lot = "0.11.1"
......@@ -19,21 +19,21 @@
#![recursion_limit="256"]
#![warn(missing_docs)]
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
use parity_scale_codec::{Encode, Decode};
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt};
use parity_scale_codec::{Encode, Decode, Input, Error as CodecError};
use futures::{select, channel::oneshot, future, FutureExt};
use futures_timer::Delay;
use kvdb_rocksdb::{Database, DatabaseConfig};
use kvdb::{KeyValueDB, DBTransaction};
use polkadot_primitives::v1::{
Hash, AvailableData, BlockNumber, CandidateEvent, ErasureChunk, ValidatorIndex, CandidateHash,
CandidateReceipt,
};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem,
......@@ -42,8 +42,12 @@ use polkadot_subsystem::{
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest,
AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0};
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "availability";
......@@ -53,374 +57,348 @@ mod columns {
pub const NUM_COLUMNS: u32 = 2;
}
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
/// The following constants are used under normal conditions:
#[error(transparent)]
ChainApi(#[from] ChainApiError),
const AVAILABLE_PREFIX: &[u8; 9] = b"available";
const CHUNK_PREFIX: &[u8; 5] = b"chunk";
const META_PREFIX: &[u8; 4] = b"meta";
const UNFINALIZED_PREFIX: &[u8; 11] = b"unfinalized";
const PRUNE_BY_TIME_PREFIX: &[u8; 13] = b"prune_by_time";
#[error(transparent)]
Erasure(#[from] erasure::Error),
// We have some keys we want to map to empty values because existence of the key is enough. We use this because
// rocksdb doesn't support empty values.
const TOMBSTONE_VALUE: &[u8] = &*b" ";
#[error(transparent)]
Io(#[from] io::Error),
/// Unavailable blocks are kept for 1 hour.
const KEEP_UNAVAILABLE_FOR: Duration = Duration::from_secs(60 * 60);
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
/// Finalized data is kept for 25 hours.
const KEEP_FINALIZED_FOR: Duration = Duration::from_secs(25 * 60 * 60);
#[error(transparent)]
Subsystem(#[from] SubsystemError),
/// The pruning interval.
const PRUNING_INTERVAL: Duration = Duration::from_secs(60 * 5);
#[error(transparent)]
Time(#[from] SystemTimeError),
/// Unix time wrapper with big-endian encoding.
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
struct BETimestamp(u64);
#[error("Custom databases are not supported")]
CustomDatabase,
}
impl Encode for BETimestamp {
fn size_hint(&self) -> usize {
std::mem::size_of::<u64>()
}
impl Error {
fn trace(&self) {
match self {
// don't spam the log with spurious errors
Self::RuntimeApi(_) |
Self::Oneshot(_) => tracing::debug!(target: LOG_TARGET, err = ?self),
// it's worth reporting otherwise
_ => tracing::warn!(target: LOG_TARGET, err = ?self),
}
fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
f(&self.0.to_be_bytes())
}
}
/// A wrapper type for delays.
#[derive(Clone, Debug, Decode, Encode, Eq)]
enum PruningDelay {
/// This pruning should be triggered after this `Duration` from UNIX_EPOCH.
In(Duration),
impl Decode for BETimestamp {
fn decode<I: Input>(value: &mut I) -> Result<Self, CodecError> {
<[u8; 8]>::decode(value).map(u64::from_be_bytes).map(Self)
}
}
/// Data is in the state where it has no expiration.
Indefinite,
impl From<Duration> for BETimestamp {
fn from(d: Duration) -> Self {
BETimestamp(d.as_secs())
}
}
impl PruningDelay {
fn now() -> Result<Self, Error> {
Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.into())
impl Into<Duration> for BETimestamp {
fn into(self) -> Duration {
Duration::from_secs(self.0)
}
}
fn into_the_future(duration: Duration) -> Result<Self, Error> {
Ok(Self::In(SystemTime::now().duration_since(UNIX_EPOCH)? + duration))
/// [`BlockNumber`] wrapper with big-endian encoding.
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
struct BEBlockNumber(BlockNumber);
impl Encode for BEBlockNumber {
fn size_hint(&self) -> usize {
std::mem::size_of::<BlockNumber>()
}
fn as_duration(&self) -> Option<Duration> {
match self {
PruningDelay::In(d) => Some(*d),
PruningDelay::Indefinite => None,
}
fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
f(&self.0.to_be_bytes())
}
}
impl From<Duration> for PruningDelay {
fn from(d: Duration) -> Self {
Self::In(d)
impl Decode for BEBlockNumber {
fn decode<I: Input>(value: &mut I) -> Result<Self, CodecError> {
<[u8; std::mem::size_of::<BlockNumber>()]>::decode(value).map(BlockNumber::from_be_bytes).map(Self)
}
}
impl PartialEq for PruningDelay {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(PruningDelay::In(this), PruningDelay::In(that)) => {this == that},
(PruningDelay::Indefinite, PruningDelay::Indefinite) => true,
_ => false,
}
}
#[derive(Debug, Encode, Decode)]
enum State {
/// Candidate data was first observed at the given time but is not available in any block.
#[codec(index = "0")]
Unavailable(BETimestamp),
/// The candidate was first observed at the given time and was included in the given list of unfinalized blocks, which may be
/// empty. The timestamp here is not used for pruning. Either one of these blocks will be finalized or the state will regress to
/// `State::Unavailable`, in which case the same timestamp will be reused. Blocks are sorted ascending first by block number and
/// then hash.
#[codec(index = "1")]
Unfinalized(BETimestamp, Vec<(BEBlockNumber, Hash)>),
/// Candidate data has appeared in a finalized block and did so at the given time.
#[codec(index = "2")]
Finalized(BETimestamp)
}
impl PartialOrd for PruningDelay {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (self, other) {
(PruningDelay::In(this), PruningDelay::In(that)) => this.partial_cmp(that),
(PruningDelay::In(_), PruningDelay::Indefinite) => Some(Ordering::Less),
(PruningDelay::Indefinite, PruningDelay::In(_)) => Some(Ordering::Greater),
(PruningDelay::Indefinite, PruningDelay::Indefinite) => Some(Ordering::Equal),
}
}
// Meta information about a candidate.
#[derive(Debug, Encode, Decode)]
struct CandidateMeta {
state: State,
data_available: bool,
chunks_stored: BitVec<BitOrderLsb0, u8>,
}
impl Ord for PruningDelay {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(PruningDelay::In(this), PruningDelay::In(that)) => this.cmp(that),
(PruningDelay::In(_), PruningDelay::Indefinite) => Ordering::Less,
(PruningDelay::Indefinite, PruningDelay::In(_)) => Ordering::Greater,
(PruningDelay::Indefinite, PruningDelay::Indefinite) => Ordering::Equal,
fn query_inner<D: Decode>(
db: &Arc<dyn KeyValueDB>,
column: u32,
key: &[u8],
) -> Result<Option<D>, Error> {
match db.get(column, key) {
Ok(Some(raw)) => {
let res = D::decode(&mut &raw[..])?;
Ok(Some(res))
}
Ok(None) => Ok(None),
Err(e) => {
tracing::warn!(target: LOG_TARGET, err = ?e, "Error reading from the availability store");
Err(e.into())
}
}
}
/// A key for chunk pruning records.
const CHUNK_PRUNING_KEY: [u8; 14] = *b"chunks_pruning";
fn write_available_data(
tx: &mut DBTransaction,
hash: &CandidateHash,
available_data: &AvailableData,
) {
let key = (AVAILABLE_PREFIX, hash).encode();
/// A key for PoV pruning records.
const POV_PRUNING_KEY: [u8; 11] = *b"pov_pruning";
tx.put_vec(columns::DATA, &key[..], available_data.encode());
}
/// A key for a cached value of next scheduled PoV pruning.
const NEXT_POV_PRUNING: [u8; 16] = *b"next_pov_pruning";
fn load_available_data(
db: &Arc<dyn KeyValueDB>,
hash: &CandidateHash,
) -> Result<Option<AvailableData>, Error> {
let key = (AVAILABLE_PREFIX, hash).encode();
/// A key for a cached value of next scheduled chunk pruning.
const NEXT_CHUNK_PRUNING: [u8; 18] = *b"next_chunk_pruning";
query_inner(db, columns::DATA, &key)
}
/// The following constants are used under normal conditions:
fn delete_available_data(
tx: &mut DBTransaction,
hash: &CandidateHash,
) {
let key = (AVAILABLE_PREFIX, hash).encode();
/// Stored block is kept available for 1 hour.
const KEEP_STORED_BLOCK_FOR: Duration = Duration::from_secs(60 * 60);
/// Finalized block is kept for 1 day.
const KEEP_FINALIZED_BLOCK_FOR: Duration = Duration::from_secs(24 * 60 * 60);
/// Keep chunk of the finalized block for 1 day + 1 hour.
const KEEP_FINALIZED_CHUNK_FOR: Duration = Duration::from_secs(25 * 60 * 60);
/// At which point in time since UNIX_EPOCH we need to wakeup and do next pruning of blocks.
/// Essenially this is the first element in the sorted array of pruning data,
/// we just want to cache it here to avoid lifting the whole array just to look at the head.
///
/// This record exists under `NEXT_POV_PRUNING` key, if it does not either:
/// a) There are no records and nothing has to be pruned.
/// b) There are records but all of them are in `Included` state and do not have exact time to
/// be pruned.
#[derive(Decode, Encode)]
struct NextPoVPruning(Duration);
impl NextPoVPruning {
// After which duration from `now` this should fire.
fn should_fire_in(&self) -> Result<Duration, Error> {
Ok(self.0.checked_sub(SystemTime::now().duration_since(UNIX_EPOCH)?).unwrap_or_default())
}
tx.delete(columns::DATA, &key[..])
}
/// At which point in time since UNIX_EPOCH we need to wakeup and do next pruning of chunks.
/// Essentially this is the first element in the sorted array of pruning data,
/// we just want to cache it here to avoid lifting the whole array just to look at the head.
///
/// This record exists under `NEXT_CHUNK_PRUNING` key, if it does not either:
/// a) There are no records and nothing has to be pruned.
/// b) There are records but all of them are in `Included` state and do not have exact time to
/// be pruned.
#[derive(Decode, Encode)]
struct NextChunkPruning(Duration);
impl NextChunkPruning {
// After which amount of seconds into the future from `now` this should fire.
fn should_fire_in(&self) -> Result<Duration, Error> {
Ok(self.0.checked_sub(SystemTime::now().duration_since(UNIX_EPOCH)?).unwrap_or_default())
}
}
fn load_chunk(
db: &Arc<dyn KeyValueDB>,
candidate_hash: &CandidateHash,
chunk_index: ValidatorIndex,
) -> Result<Option<ErasureChunk>, Error> {
let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode();
/// Struct holding pruning timing configuration.
/// The only purpose of this structure is to use different timing
/// configurations in production and in testing.
#[derive(Clone)]
struct PruningConfig {
/// How long should a stored block stay available.
keep_stored_block_for: Duration,
query_inner(db, columns::DATA, &key)
}
/// How long should a finalized block stay available.
keep_finalized_block_for: Duration,
fn write_chunk(
tx: &mut DBTransaction,
candidate_hash: &CandidateHash,
chunk_index: ValidatorIndex,
erasure_chunk: &ErasureChunk,
) {
let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode();
/// How long should a chunk of a finalized block stay available.
keep_finalized_chunk_for: Duration,
tx.put_vec(columns::DATA, &key, erasure_chunk.encode());
}
impl Default for PruningConfig {
fn default() -> Self {
Self {
keep_stored_block_for: KEEP_STORED_BLOCK_FOR,
keep_finalized_block_for: KEEP_FINALIZED_BLOCK_FOR,
keep_finalized_chunk_for: KEEP_FINALIZED_CHUNK_FOR,
}
}
}
fn delete_chunk(
tx: &mut DBTransaction,
candidate_hash: &CandidateHash,
chunk_index: ValidatorIndex,
) {
let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode();
#[derive(Debug, Decode, Encode, Eq, PartialEq)]
enum CandidateState {
Stored,
Included,
Finalized,
tx.delete(columns::DATA, &key[..]);
}
#[derive(Debug, Decode, Encode, Eq)]
struct PoVPruningRecord {
candidate_hash: CandidateHash,
block_number: BlockNumber,
candidate_state: CandidateState,
prune_at: PruningDelay,
}
fn load_meta(
db: &Arc<dyn KeyValueDB>,
hash: &CandidateHash,
) -> Result<Option<CandidateMeta>, Error> {
let key = (META_PREFIX, hash).encode();
impl PartialEq for PoVPruningRecord {
fn eq(&self, other: &Self) -> bool {
self.candidate_hash == other.candidate_hash
}
query_inner(db, columns::META, &key)
}
impl Ord for PoVPruningRecord {
fn cmp(&self, other: &Self) -> Ordering {
if self.candidate_hash == other.candidate_hash {
return Ordering::Equal;
}
fn write_meta(
tx: &mut DBTransaction,
hash: &CandidateHash,
meta: &CandidateMeta,
) {
let key = (META_PREFIX, hash).encode();
self.prune_at.cmp(&other.prune_at)
}
tx.put_vec(columns::META, &key, meta.encode());
}
impl PartialOrd for PoVPruningRecord {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
fn delete_meta(tx: &mut DBTransaction, hash: &CandidateHash) {
let key = (META_PREFIX, hash).encode();
tx.delete(columns::META, &key[..])
}
#[derive(Debug, Decode, Encode, Eq)]
struct ChunkPruningRecord {
candidate_hash: CandidateHash,
fn delete_unfinalized_height(
tx: &mut DBTransaction,
block_number: BlockNumber,
candidate_state: CandidateState,
chunk_index: u32,
prune_at: PruningDelay,
) {
let prefix = (UNFINALIZED_PREFIX, BEBlockNumber(block_number)).encode();
tx.delete_prefix(columns::META, &prefix);
}
impl PartialEq for ChunkPruningRecord {
fn eq(&self, other: &Self) -> bool {
self.candidate_hash == other.candidate_hash &&
self.chunk_index == other.chunk_index
}
fn delete_unfinalized_inclusion(
tx: &mut DBTransaction,
block_number: BlockNumber,
block_hash: &Hash,
candidate_hash: &CandidateHash,
) {
let key = (
UNFINALIZED_PREFIX,
BEBlockNumber(block_number),
block_hash,
candidate_hash,
).encode();
tx.delete(columns::META, &key[..]);
}
impl Ord for ChunkPruningRecord {
fn cmp(&self, other: &Self) -> Ordering {
if self.candidate_hash == other.candidate_hash {
return Ordering::Equal;
}
fn delete_pruning_key(tx: &mut DBTransaction, t: impl Into<BETimestamp>, h: &CandidateHash) {
let key = (PRUNE_BY_TIME_PREFIX, t.into(), h).encode();
tx.delete(columns::META, &key);
}
self.prune_at.cmp(&other.prune_at)
}
fn write_pruning_key(tx: &mut DBTransaction, t: impl Into<BETimestamp>, h: &CandidateHash) {
let t = t.into();
let key = (PRUNE_BY_TIME_PREFIX, t, h).encode();
tx.put(columns::META, &key, TOMBSTONE_VALUE);
}
impl PartialOrd for ChunkPruningRecord {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
fn finalized_block_range(finalized: BlockNumber) -> (Vec<u8>, Vec<u8>) {
// We use big-endian encoding to iterate in ascending order.
let start = UNFINALIZED_PREFIX.encode();
let end = (UNFINALIZED_PREFIX, BEBlockNumber(finalized + 1)).encode();
(start, end)
}
/// An implementation of the Availability Store subsystem.
pub struct AvailabilityStoreSubsystem {
pruning_config: PruningConfig,
inner: Arc<dyn KeyValueDB>,
chunks_cache: HashMap<CandidateHash, HashMap<u32, ErasureChunk>>,
metrics: Metrics,
fn write_unfinalized_block_contains(
tx: &mut DBTransaction,
n: BlockNumber,
h: &Hash,
ch: &CandidateHash,
) {
let key = (UNFINALIZED_PREFIX, BEBlockNumber(n), h, ch).encode();
tx.put(columns::META, &key, TOMBSTONE_VALUE);
}
impl AvailabilityStoreSubsystem {
// Perform pruning of PoVs
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn prune_povs(&mut self) -> Result<(), Error> {
let _timer = self.metrics.time_prune_povs();
let mut tx = DBTransaction::new();
let mut pov_pruning = pov_pruning(&self.inner).unwrap_or_default();
let now = PruningDelay::now()?;
tracing::trace!(target: LOG_TARGET, "Pruning PoVs");
let outdated_records_count = pov_pruning.iter()
.take_while(|r| r.prune_at <= now)
.count();
for record in pov_pruning.drain(..outdated_records_count) {
tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record");
self.chunks_cache.remove(&record.candidate_hash);
tx.delete(
columns::DATA,
available_data_key(&record.candidate_hash).as_slice(),
);
}
fn pruning_range(now: impl Into<BETimestamp>) -> (Vec<u8>, Vec<u8>) {
let start = PRUNE_BY_TIME_PREFIX.encode();
let end = (PRUNE_BY_TIME_PREFIX, BETimestamp(now.into().0 + 1)).encode();
put_pov_pruning(&self.inner, Some(tx), pov_pruning, &self.metrics)?;
(start, end)
}
Ok(())
fn decode_unfinalized_key(s: &[u8]) -> Result<(BlockNumber, Hash, CandidateHash), CodecError> {
if !s.starts_with(UNFINALIZED_PREFIX) {
return Err("missing magic string".into());
}
// Perform pruning of chunks.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn prune_chunks(&mut self) -> Result<(), Error> {
let _timer = self.metrics.time_prune_chunks();
<(BEBlockNumber, Hash, CandidateHash)>::decode(&mut &s[UNFINALIZED_PREFIX.len()..])
.map(|(b, h, ch)| (b.0, h, ch))
}
let mut tx = DBTransaction::new();
let mut chunk_pruning = chunk_pruning(&self.inner).unwrap_or_default();
let now = PruningDelay::now()?;
fn decode_pruning_key(s: &[u8]) -> Result<(Duration, CandidateHash), CodecError> {
if !s.starts_with(PRUNE_BY_TIME_PREFIX) {
return Err("missing magic string".into());
}
tracing::trace!(target: LOG_TARGET, "Pruning Chunks");
let outdated_records_count = chunk_pruning.iter()
.take_while(|r| r.prune_at <= now)
.count();
<(BETimestamp, CandidateHash)>