Unverified Commit 9060c1e3 authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

Optimizations of av-store (#2223)

* Store all chunks and in a single transaction

* Adds chunks LRU to store

* Add pruning records metrics

* Use honest cache instead of LRU

* Remove unnecessary optional cache

* Fix review nits that are fixable
parent 39d14fdc
Pipeline #119230 canceled with stages
in 5 minutes and 32 seconds
......@@ -3354,9 +3354,9 @@ dependencies = [
[[package]]
name = "lru"
version = "0.6.1"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be716eb6878ca2263eb5d00a781aa13264a794f519fe6af4fbb2668b2d5441c0"
checksum = "3aae342b73d57ad0b8b364bd12584819f2c1fe9114285dfcf8b0722607671635"
dependencies = [
"hashbrown",
]
......
......@@ -94,7 +94,7 @@ impl Error {
}
/// A wrapper type for delays.
#[derive(Debug, Decode, Encode, Eq)]
#[derive(Clone, Debug, Decode, Encode, Eq)]
enum PruningDelay {
/// This pruning should be triggered after this `Duration` from UNIX_EPOCH.
In(Duration),
......@@ -315,13 +315,14 @@ impl PartialOrd for ChunkPruningRecord {
pub struct AvailabilityStoreSubsystem {
pruning_config: PruningConfig,
inner: Arc<dyn KeyValueDB>,
chunks_cache: HashMap<CandidateHash, HashMap<u32, ErasureChunk>>,
metrics: Metrics,
}
impl AvailabilityStoreSubsystem {
// Perform pruning of PoVs
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn prune_povs(&self) -> Result<(), Error> {
fn prune_povs(&mut self) -> Result<(), Error> {
let _timer = self.metrics.time_prune_povs();
let mut tx = DBTransaction::new();
......@@ -335,20 +336,22 @@ impl AvailabilityStoreSubsystem {
for record in pov_pruning.drain(..outdated_records_count) {
tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record");
self.chunks_cache.remove(&record.candidate_hash);
tx.delete(
columns::DATA,
available_data_key(&record.candidate_hash).as_slice(),
);
}
put_pov_pruning(&self.inner, Some(tx), pov_pruning)?;
put_pov_pruning(&self.inner, Some(tx), pov_pruning, &self.metrics)?;
Ok(())
}
// Perform pruning of chunks.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn prune_chunks(&self) -> Result<(), Error> {
fn prune_chunks(&mut self) -> Result<(), Error> {
let _timer = self.metrics.time_prune_chunks();
let mut tx = DBTransaction::new();
......@@ -362,13 +365,15 @@ impl AvailabilityStoreSubsystem {
for record in chunk_pruning.drain(..outdated_records_count) {
tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record");
self.chunks_cache.remove(&record.candidate_hash);
tx.delete(
columns::DATA,
erasure_chunk_key(&record.candidate_hash, record.chunk_index).as_slice(),
);
}
put_chunk_pruning(&self.inner, Some(tx), chunk_pruning)?;
put_chunk_pruning(&self.inner, Some(tx), chunk_pruning, &self.metrics)?;
Ok(())
}
......@@ -468,6 +473,7 @@ impl AvailabilityStoreSubsystem {
Ok(Self {
pruning_config: PruningConfig::default(),
inner: Arc::new(db),
chunks_cache: HashMap::new(),
metrics,
})
}
......@@ -477,6 +483,7 @@ impl AvailabilityStoreSubsystem {
Self {
pruning_config,
inner,
chunks_cache: HashMap::new(),
metrics: Metrics(None),
}
}
......@@ -535,7 +542,7 @@ where
ActiveLeavesUpdate { activated, .. })
) => {
for (activated, _span) in activated.into_iter() {
process_block_activated(ctx, &subsystem.inner, activated, &subsystem.metrics).await?;
process_block_activated(ctx, subsystem, activated).await?;
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
......@@ -590,7 +597,7 @@ async fn process_block_finalized(
}
}
put_pov_pruning(db, None, pov_pruning)?;
put_pov_pruning(db, None, pov_pruning, &subsystem.metrics)?;
}
if let Some(mut chunk_pruning) = chunk_pruning(db) {
......@@ -609,23 +616,23 @@ async fn process_block_finalized(
}
}
put_chunk_pruning(db, None, chunk_pruning)?;
put_chunk_pruning(db, None, chunk_pruning, &subsystem.metrics)?;
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(ctx, db, metrics), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, subsystem), fields(subsystem = LOG_TARGET))]
async fn process_block_activated<Context>(
ctx: &mut Context,
db: &Arc<dyn KeyValueDB>,
subsystem: &mut AvailabilityStoreSubsystem,
hash: Hash,
metrics: &Metrics,
) -> Result<(), Error>
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
let _timer = metrics.time_block_activated();
let _timer = subsystem.metrics.time_block_activated();
let db = &subsystem.inner;
let events = match request_candidate_events(ctx, hash).await {
Ok(events) => events,
......@@ -649,6 +656,10 @@ where
}
}
for included in &included {
subsystem.chunks_cache.remove(&included);
}
if let Some(mut pov_pruning) = pov_pruning(db) {
for record in pov_pruning.iter_mut() {
if included.contains(&record.candidate_hash) {
......@@ -659,7 +670,7 @@ where
pov_pruning.sort();
put_pov_pruning(db, None, pov_pruning)?;
put_pov_pruning(db, None, pov_pruning, &subsystem.metrics)?;
}
if let Some(mut chunk_pruning) = chunk_pruning(db) {
......@@ -672,7 +683,7 @@ where
chunk_pruning.sort();
put_chunk_pruning(db, None, chunk_pruning)?;
put_chunk_pruning(db, None, chunk_pruning, &subsystem.metrics)?;
}
Ok(())
......@@ -742,11 +753,11 @@ where
tx.send(result?).map_err(|_| oneshot::Canceled)?;
}
StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => {
StoreChunk { candidate_hash, relay_parent, chunk, tx } => {
let chunk_index = chunk.index;
// Current block number is relay_parent block number + 1.
let block_number = get_block_number(ctx, relay_parent).await? + 1;
let result = store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number);
let result = store_chunks(subsystem, &candidate_hash, vec![chunk], block_number);
tracing::trace!(
target: LOG_TARGET,
......@@ -802,14 +813,17 @@ fn chunk_pruning(db: &Arc<dyn KeyValueDB>) -> Option<Vec<ChunkPruningRecord>> {
query_inner(db, columns::META, &CHUNK_PRUNING_KEY)
}
#[tracing::instrument(level = "trace", skip(db, tx), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(db, tx, metrics), fields(subsystem = LOG_TARGET))]
fn put_pov_pruning(
db: &Arc<dyn KeyValueDB>,
tx: Option<DBTransaction>,
mut pov_pruning: Vec<PoVPruningRecord>,
metrics: &Metrics,
) -> Result<(), Error> {
let mut tx = tx.unwrap_or_default();
metrics.block_pruning_records_size(pov_pruning.len());
pov_pruning.sort();
tx.put_vec(
......@@ -843,14 +857,17 @@ fn put_pov_pruning(
Ok(())
}
#[tracing::instrument(level = "trace", skip(db, tx), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(db, tx, metrics), fields(subsystem = LOG_TARGET))]
fn put_chunk_pruning(
db: &Arc<dyn KeyValueDB>,
tx: Option<DBTransaction>,
mut chunk_pruning: Vec<ChunkPruningRecord>,
metrics: &Metrics,
) -> Result<(), Error> {
let mut tx = tx.unwrap_or_default();
metrics.chunk_pruning_records_size(chunk_pruning.len());
chunk_pruning.sort();
tx.put_vec(
......@@ -910,16 +927,13 @@ fn store_available_data(
let block_number = available_data.validation_data.block_number;
if let Some(index) = id {
let chunks = get_chunks(&available_data, n_validators as usize, &subsystem.metrics)?;
store_chunk(
subsystem,
candidate_hash,
n_validators,
chunks[index as usize].clone(),
block_number,
)?;
}
let chunks = get_chunks(&available_data, n_validators as usize, &subsystem.metrics)?;
store_chunks(
subsystem,
candidate_hash,
chunks,
block_number,
)?;
let stored_data = StoredAvailableData {
data: available_data,
......@@ -966,23 +980,19 @@ fn store_available_data(
}
#[tracing::instrument(level = "trace", skip(subsystem), fields(subsystem = LOG_TARGET))]
fn store_chunk(
fn store_chunks(
subsystem: &mut AvailabilityStoreSubsystem,
candidate_hash: &CandidateHash,
_n_validators: u32,
chunk: ErasureChunk,
chunks: Vec<ErasureChunk>,
block_number: BlockNumber,
) -> Result<(), Error> {
let _timer = subsystem.metrics.time_store_chunk();
let _timer = subsystem.metrics.time_store_chunks();
let mut tx = DBTransaction::new();
let dbkey = erasure_chunk_key(candidate_hash, chunk.index);
let mut chunk_pruning = chunk_pruning(&subsystem.inner).unwrap_or_default();
let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?;
if let Some(delay) = prune_at.as_duration() {
let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?;
if let Some(delay) = prune_at.clone().as_duration() {
tx.put_vec(
columns::META,
&NEXT_CHUNK_PRUNING,
......@@ -990,23 +1000,29 @@ fn store_chunk(
);
}
let pruning_record = ChunkPruningRecord {
candidate_hash: candidate_hash.clone(),
block_number,
candidate_state: CandidateState::Stored,
chunk_index: chunk.index,
prune_at,
};
for chunk in chunks {
subsystem.chunks_cache.entry(*candidate_hash).or_default().insert(chunk.index, chunk.clone());
let idx = chunk_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx);
let pruning_record = ChunkPruningRecord {
candidate_hash: candidate_hash.clone(),
block_number,
candidate_state: CandidateState::Stored,
chunk_index: chunk.index,
prune_at: prune_at.clone(),
};
chunk_pruning.insert(idx, pruning_record);
let idx = chunk_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx);
tx.put_vec(
columns::DATA,
&dbkey,
chunk.encode(),
);
chunk_pruning.insert(idx, pruning_record);
let dbkey = erasure_chunk_key(candidate_hash, chunk.index);
tx.put_vec(
columns::DATA,
&dbkey,
chunk.encode(),
);
}
tx.put_vec(
columns::META,
......@@ -1027,6 +1043,12 @@ fn get_chunk(
) -> Result<Option<ErasureChunk>, Error> {
let _timer = subsystem.metrics.time_get_chunk();
if let Some(entry) = subsystem.chunks_cache.get(candidate_hash) {
if let Some(chunk) = entry.get(&index) {
return Ok(Some(chunk.clone()));
}
}
if let Some(chunk) = query_inner(
&subsystem.inner,
columns::DATA,
......@@ -1036,17 +1058,14 @@ fn get_chunk(
}
if let Some(data) = available_data(&subsystem.inner, candidate_hash) {
let mut chunks = get_chunks(&data.data, data.n_validators as usize, &subsystem.metrics)?;
let chunks = get_chunks(&data.data, data.n_validators as usize, &subsystem.metrics)?;
let desired_chunk = chunks.get(index as usize).cloned();
for chunk in chunks.drain(..) {
store_chunk(
subsystem,
candidate_hash,
data.n_validators,
chunk,
data.data.validation_data.block_number,
)?;
}
store_chunks(
subsystem,
candidate_hash,
chunks,
data.data.validation_data.block_number,
)?;
return Ok(desired_chunk);
}
......@@ -1109,13 +1128,15 @@ fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> R
#[derive(Clone)]
struct MetricsInner {
received_availability_chunks_total: prometheus::Counter<prometheus::U64>,
chunk_pruning_records_total: prometheus::Gauge<prometheus::U64>,
block_pruning_records_total: prometheus::Gauge<prometheus::U64>,
prune_povs: prometheus::Histogram,
prune_chunks: prometheus::Histogram,
process_block_finalized: prometheus::Histogram,
block_activated: prometheus::Histogram,
process_message: prometheus::Histogram,
store_available_data: prometheus::Histogram,
store_chunk: prometheus::Histogram,
store_chunks: prometheus::Histogram,
get_chunk: prometheus::Histogram,
}
......@@ -1133,6 +1154,22 @@ impl Metrics {
}
}
fn chunk_pruning_records_size(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
let total = u64::try_from(count).unwrap_or_default();
metrics.chunk_pruning_records_total.set(total);
}
}
fn block_pruning_records_size(&self, count: usize) {
if let Some(metrics) = &self.0 {
use core::convert::TryFrom as _;
let total = u64::try_from(count).unwrap_or_default();
metrics.block_pruning_records_total.set(total);
}
}
/// Provide a timer for `prune_povs` which observes on drop.
fn time_prune_povs(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.prune_povs.start_timer())
......@@ -1164,8 +1201,8 @@ impl Metrics {
}
/// Provide a timer for `store_chunk` which observes on drop.
fn time_store_chunk(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_chunk.start_timer())
fn time_store_chunks(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.store_chunks.start_timer())
}
/// Provide a timer for `get_chunk` which observes on drop.
......@@ -1184,6 +1221,20 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
chunk_pruning_records_total: prometheus::register(
prometheus::Gauge::new(
"parachain_chunk_pruning_records_total",
"Number of chunk pruning records kept by the storage.",
)?,
registry,
)?,
block_pruning_records_total: prometheus::register(
prometheus::Gauge::new(
"parachain_block_pruning_records_total",
"Number of block pruning records kept by the storage.",
)?,
registry,
)?,
prune_povs: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
......@@ -1238,11 +1289,11 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
store_chunk: prometheus::register(
store_chunks: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_av_store_store_chunk",
"Time spent within `av_store::store_chunk`",
"parachain_av_store_store_chunks",
"Time spent within `av_store::store_chunks`",
)
)?,
registry,
......
......@@ -235,7 +235,6 @@ fn store_chunk_works() {
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: chunk.clone(),
tx,
};
......@@ -385,7 +384,6 @@ fn stored_but_not_included_chunk_is_pruned() {
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: chunk.clone(),
tx,
};
......@@ -589,7 +587,6 @@ fn stored_chunk_kept_until_finalized() {
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: chunk.clone(),
tx,
};
......
......@@ -1036,7 +1036,6 @@ where
AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: erasure_chunk,
tx,
}
......
......@@ -309,8 +309,6 @@ pub enum AvailabilityStoreMessage {
candidate_hash: CandidateHash,
/// A relevant relay parent.
relay_parent: Hash,
/// The index of the validator this chunk belongs to.
validator_index: ValidatorIndex,
/// The chunk itself.
chunk: ErasureChunk,
/// Sending side of the channel to send result to.
......
......@@ -174,9 +174,9 @@ enum AvailabilityStoreMessage {
/// Query a specific availability chunk of the candidate's erasure-coding by validator index.
/// Returns the chunk and its inclusion proof against the candidate's erasure-root.
QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel<Option<AvailabilityChunkAndProof>>),
/// Store a specific chunk of the candidate's erasure-coding by validator index, with an
/// Store a specific chunk of the candidate's erasure-coding, with an
/// accompanying proof.
StoreChunk(CandidateHash, ValidatorIndex, AvailabilityChunkAndProof, ResponseChannel<Result<()>>),
StoreChunk(CandidateHash, AvailabilityChunkAndProof, ResponseChannel<Result<()>>),
/// Store `AvailableData`. If `ValidatorIndex` is provided, also store this validator's
/// `AvailabilityChunkAndProof`.
StoreAvailableData(CandidateHash, Option<ValidatorIndex>, u32, AvailableData, ResponseChannel<Result<()>>),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment