lib.rs 28.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements a `AvailabilityStoreSubsystem`.

#![recursion_limit="256"]
#![warn(missing_docs)]

22
23
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
24
25
26
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
27
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
28
29

use codec::{Encode, Decode};
30
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt};
31
use futures_timer::Delay;
32
33
34
35
use kvdb_rocksdb::{Database, DatabaseConfig};
use kvdb::{KeyValueDB, DBTransaction};

use polkadot_primitives::v1::{
36
	Hash, AvailableData, BlockNumber, CandidateEvent, ErasureChunk, ValidatorIndex, CandidateHash,
37
38
};
use polkadot_subsystem::{
39
40
41
	FromOverseer, OverseerSignal, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem,
	ActiveLeavesUpdate,
	errors::{ChainApiError, RuntimeApiError},
42
};
43
44
45
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_subsystem::messages::{
	AllMessages, AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest,
46
};
47
use thiserror::Error;
48
49
50
51
52

const LOG_TARGET: &str = "availability";

mod columns {
	pub const DATA: u32 = 0;
53
54
	pub const META: u32 = 1;
	pub const NUM_COLUMNS: u32 = 2;
55
56
}

57
#[derive(Debug, Error)]
58
enum Error {
59
	#[error(transparent)]
60
61
62
	RuntimeApi(#[from] RuntimeApiError),
	#[error(transparent)]
	ChainApi(#[from] ChainApiError),
63
64
65
66
67
	#[error(transparent)]
	Erasure(#[from] erasure::Error),
	#[error(transparent)]
	Io(#[from] io::Error),
	#[error(transparent)]
68
	Oneshot(#[from] oneshot::Canceled),
69
70
71
72
	#[error(transparent)]
	Subsystem(#[from] SubsystemError),
	#[error(transparent)]
	Time(#[from] SystemTimeError),
73
74
}

75
76
77
78
79
80
81
82
83
84
impl Error {
	fn severity(&self) -> log::Level {
		match self {
			// don't spam the log with spurious errors
			Self::RuntimeApi(_) |
			Self::Oneshot(_) => log::Level::Debug,
			// it's worth reporting otherwise
			_ => log::Level::Warn,
		}
	}
85
86
}

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/// A wrapper type for delays.
#[derive(Debug, Decode, Encode, Eq)]
enum PruningDelay {
	/// This pruning should be triggered after this `Duration` from UNIX_EPOCH.
	In(Duration),

	/// Data is in the state where it has no expiration.
	Indefinite,
}

impl PruningDelay {
	fn now() -> Result<Self, Error> {
		Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.into())
	}

	fn into_the_future(duration: Duration) -> Result<Self, Error> {
		Ok(Self::In(SystemTime::now().duration_since(UNIX_EPOCH)? + duration))
	}

	fn as_duration(&self) -> Option<Duration> {
		match self {
108
109
			PruningDelay::In(d) => Some(*d),
			PruningDelay::Indefinite => None,
110
111
112
113
114
		}
	}
}

impl From<Duration> for PruningDelay {
115
	fn from(d: Duration) -> Self {
116
		Self::In(d)
117
	}
118
119
120
}

impl PartialEq for PruningDelay {
121
	fn eq(&self, other: &Self) -> bool {
122
		match (self, other) {
123
124
			(PruningDelay::In(this), PruningDelay::In(that)) => {this == that},
			(PruningDelay::Indefinite, PruningDelay::Indefinite) => true,
125
126
			_ => false,
		}
127
	}
128
129
130
}

impl PartialOrd for PruningDelay {
131
	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
132
		match (self, other) {
133
134
135
136
			(PruningDelay::In(this), PruningDelay::In(that)) => this.partial_cmp(that),
			(PruningDelay::In(_), PruningDelay::Indefinite) => Some(Ordering::Less),
			(PruningDelay::Indefinite, PruningDelay::In(_)) => Some(Ordering::Greater),
			(PruningDelay::Indefinite, PruningDelay::Indefinite) => Some(Ordering::Equal),
137
		}
138
	}
139
140
141
}

impl Ord for PruningDelay {
142
	fn cmp(&self, other: &Self) -> Ordering {
143
		match (self, other) {
144
145
146
147
			(PruningDelay::In(this), PruningDelay::In(that)) => this.cmp(that),
			(PruningDelay::In(_), PruningDelay::Indefinite) => Ordering::Less,
			(PruningDelay::Indefinite, PruningDelay::In(_)) => Ordering::Greater,
			(PruningDelay::Indefinite, PruningDelay::Indefinite) => Ordering::Equal,
148
		}
149
	}
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
}

/// A key for chunk pruning records.
const CHUNK_PRUNING_KEY: [u8; 14] = *b"chunks_pruning";

/// A key for PoV pruning records.
const POV_PRUNING_KEY: [u8; 11] = *b"pov_pruning";

/// A key for a cached value of next scheduled PoV pruning.
const NEXT_POV_PRUNING: [u8; 16] = *b"next_pov_pruning";

/// A key for a cached value of next scheduled chunk pruning.
const NEXT_CHUNK_PRUNING: [u8; 18] = *b"next_chunk_pruning";

/// The following constants are used under normal conditions:

/// Stored block is kept available for 1 hour.
const KEEP_STORED_BLOCK_FOR: Duration = Duration::from_secs(60 * 60);

/// Finalized block is kept for 1 day.
const KEEP_FINALIZED_BLOCK_FOR: Duration = Duration::from_secs(24 * 60 * 60);

/// Keep chunk of the finalized block for 1 day + 1 hour.
const KEEP_FINALIZED_CHUNK_FOR: Duration = Duration::from_secs(25 * 60 * 60);

/// At which point in time since UNIX_EPOCH we need to wakeup and do next pruning of blocks.
/// Essenially this is the first element in the sorted array of pruning data,
/// we just want to cache it here to avoid lifting the whole array just to look at the head.
///
/// This record exists under `NEXT_POV_PRUNING` key, if it does not either:
///  a) There are no records and nothing has to be pruned.
///  b) There are records but all of them are in `Included` state and do not have exact time to
///     be pruned.
#[derive(Decode, Encode)]
struct NextPoVPruning(Duration);

impl NextPoVPruning {
	// After which duration from `now` this should fire.
	fn should_fire_in(&self) -> Result<Duration, Error> {
189
		Ok(self.0.checked_sub(SystemTime::now().duration_since(UNIX_EPOCH)?).unwrap_or_default())
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
	}
}

/// At which point in time since UNIX_EPOCH we need to wakeup and do next pruning of chunks.
/// Essentially this is the first element in the sorted array of pruning data,
/// we just want to cache it here to avoid lifting the whole array just to look at the head.
///
/// This record exists under `NEXT_CHUNK_PRUNING` key, if it does not either:
///  a) There are no records and nothing has to be pruned.
///  b) There are records but all of them are in `Included` state and do not have exact time to
///     be pruned.
#[derive(Decode, Encode)]
struct NextChunkPruning(Duration);

impl NextChunkPruning {
	// After which amount of seconds into the future from `now` this should fire.
	fn should_fire_in(&self) -> Result<Duration, Error> {
207
		Ok(self.0.checked_sub(SystemTime::now().duration_since(UNIX_EPOCH)?).unwrap_or_default())
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
	}
}

/// Struct holding pruning timing configuration.
/// The only purpose of this structure is to use different timing
/// configurations in production and in testing.
#[derive(Clone)]
struct PruningConfig {
	/// How long should a stored block stay available.
	keep_stored_block_for: Duration,

	/// How long should a finalized block stay available.
	keep_finalized_block_for: Duration,

	/// How long should a chunk of a finalized block stay available.
	keep_finalized_chunk_for: Duration,
}

impl Default for PruningConfig {
227
	fn default() -> Self {
228
229
230
231
232
		Self {
			keep_stored_block_for: KEEP_STORED_BLOCK_FOR,
			keep_finalized_block_for: KEEP_FINALIZED_BLOCK_FOR,
			keep_finalized_chunk_for: KEEP_FINALIZED_CHUNK_FOR,
		}
233
	}
234
235
236
237
238
239
240
241
242
243
244
}

#[derive(Debug, Decode, Encode, Eq, PartialEq)]
enum CandidateState {
	Stored,
	Included,
	Finalized,
}

#[derive(Debug, Decode, Encode, Eq)]
struct PoVPruningRecord {
245
	candidate_hash: CandidateHash,
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
	block_number: BlockNumber,
	candidate_state: CandidateState,
	prune_at: PruningDelay,
}

impl PartialEq for PoVPruningRecord {
	fn eq(&self, other: &Self) -> bool {
		self.candidate_hash == other.candidate_hash
	}
}

impl Ord for PoVPruningRecord {
	fn cmp(&self, other: &Self) -> Ordering {
		if self.candidate_hash == other.candidate_hash {
			return Ordering::Equal;
		}

		self.prune_at.cmp(&other.prune_at)
	}
}

impl PartialOrd for PoVPruningRecord {
268
	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
269
		Some(self.cmp(other))
270
	}
271
272
273
274
}

#[derive(Debug, Decode, Encode, Eq)]
struct ChunkPruningRecord {
275
	candidate_hash: CandidateHash,
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
	block_number: BlockNumber,
	candidate_state: CandidateState,
	chunk_index: u32,
	prune_at: PruningDelay,
}

impl PartialEq for ChunkPruningRecord {
	fn eq(&self, other: &Self) -> bool {
		self.candidate_hash == other.candidate_hash &&
			self.chunk_index == other.chunk_index
	}
}

impl Ord for ChunkPruningRecord {
	fn cmp(&self, other: &Self) -> Ordering {
		if self.candidate_hash == other.candidate_hash {
			return Ordering::Equal;
		}

		self.prune_at.cmp(&other.prune_at)
	}
}

impl PartialOrd for ChunkPruningRecord {
	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
		Some(self.cmp(other))
	}
303
304
305
306
}

/// An implementation of the Availability Store subsystem.
pub struct AvailabilityStoreSubsystem {
307
	pruning_config: PruningConfig,
308
	inner: Arc<dyn KeyValueDB>,
309
	metrics: Metrics,
310
311
}

312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
impl AvailabilityStoreSubsystem {
	// Perform pruning of PoVs
	fn prune_povs(&self) -> Result<(), Error> {
		let mut tx = DBTransaction::new();
		let mut pov_pruning = pov_pruning(&self.inner).unwrap_or_default();
		let now = PruningDelay::now()?;

		log::trace!(target: LOG_TARGET, "Pruning PoVs");
		let outdated_records_count = pov_pruning.iter()
			.take_while(|r| r.prune_at <= now)
			.count();

		for record in pov_pruning.drain(..outdated_records_count) {
			log::trace!(target: LOG_TARGET, "Removing record {:?}", record);
			tx.delete(
				columns::DATA,
				available_data_key(&record.candidate_hash).as_slice(),
			);
		}

		put_pov_pruning(&self.inner, Some(tx), pov_pruning)?;

		Ok(())
	}

	// Perform pruning of chunks.
	fn prune_chunks(&self) -> Result<(), Error> {
		let mut tx = DBTransaction::new();
		let mut chunk_pruning = chunk_pruning(&self.inner).unwrap_or_default();
		let now = PruningDelay::now()?;

		log::trace!(target: LOG_TARGET, "Pruning Chunks");
		let outdated_records_count = chunk_pruning.iter()
			.take_while(|r| r.prune_at <= now)
			.count();

		for record in chunk_pruning.drain(..outdated_records_count) {
			log::trace!(target: LOG_TARGET, "Removing record {:?}", record);
			tx.delete(
				columns::DATA,
				erasure_chunk_key(&record.candidate_hash, record.chunk_index).as_slice(),
			);
		}

		put_chunk_pruning(&self.inner, Some(tx), chunk_pruning)?;

		Ok(())
	}

	// Return a `Future` that either resolves when another PoV pruning has to happen
	// or is indefinitely `pending` in case no pruning has to be done.
	// Just a helper to `select` over multiple things at once.
	fn maybe_prune_povs(&self) -> Result<impl Future<Output = ()>, Error> {
		let future = match get_next_pov_pruning_time(&self.inner) {
			Some(pruning) => {
				Either::Left(Delay::new(pruning.should_fire_in()?))
			}
			None => Either::Right(future::pending::<()>()),
		};

		Ok(future)
	}

	// Return a `Future` that either resolves when another chunk pruning has to happen
	// or is indefinitely `pending` in case no pruning has to be done.
	// Just a helper to `select` over multiple things at once.
	fn maybe_prune_chunks(&self) -> Result<impl Future<Output = ()>, Error> {
		let future = match get_next_chunk_pruning_time(&self.inner) {
			Some(pruning) => {
				Either::Left(Delay::new(pruning.should_fire_in()?))
			}
			None => Either::Right(future::pending::<()>()),
		};

		Ok(future)
	}
}

390
fn available_data_key(candidate_hash: &CandidateHash) -> Vec<u8> {
391
392
393
	(candidate_hash, 0i8).encode()
}

394
fn erasure_chunk_key(candidate_hash: &CandidateHash, index: u32) -> Vec<u8> {
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
	(candidate_hash, index, 0i8).encode()
}

#[derive(Encode, Decode)]
struct StoredAvailableData {
	data: AvailableData,
	n_validators: u32,
}

/// Configuration for the availability store.
pub struct Config {
	/// Total cache size in megabytes. If `None` the default (128 MiB per column) is used.
	pub cache_size: Option<usize>,
	/// Path to the database.
	pub path: PathBuf,
}

412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
impl std::convert::TryFrom<sc_service::config::DatabaseConfig> for Config {
	type Error = &'static str;

	fn try_from(config: sc_service::config::DatabaseConfig) -> Result<Self, Self::Error> {
		let path = config.path().ok_or("custom databases are not supported")?;

		Ok(Self {
			// substrate cache size is improper here; just use the default
			cache_size: None,
			// DB path is a sub-directory of substrate db path to give two properties:
			// 1: column numbers don't conflict with substrate
			// 2: commands like purge-chain work without further changes
			path: path.join("parachains").join("av-store"),
		})
	}
}

429
430
impl AvailabilityStoreSubsystem {
	/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
431
	pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result<Self> {
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
		let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS);

		if let Some(cache_size) = config.cache_size {
			let mut memory_budget = HashMap::new();

			for i in 0..columns::NUM_COLUMNS {
				memory_budget.insert(i, cache_size / columns::NUM_COLUMNS as usize);
			}
			db_config.memory_budget = memory_budget;
		}

		let path = config.path.to_str().ok_or_else(|| io::Error::new(
			io::ErrorKind::Other,
			format!("Bad database path: {:?}", config.path),
		))?;

448
		std::fs::create_dir_all(&path)?;
449
450
451
		let db = Database::open(&db_config, &path)?;

		Ok(Self {
452
			pruning_config: PruningConfig::default(),
453
			inner: Arc::new(db),
454
			metrics,
455
456
457
458
		})
	}

	#[cfg(test)]
459
	fn new_in_memory(inner: Arc<dyn KeyValueDB>, pruning_config: PruningConfig) -> Self {
460
		Self {
461
			pruning_config,
462
			inner,
463
			metrics: Metrics(None),
464
465
466
467
		}
	}
}

468
469
470
471
472
473
474
475
476
fn get_next_pov_pruning_time(db: &Arc<dyn KeyValueDB>) -> Option<NextPoVPruning> {
	query_inner(db, columns::META, &NEXT_POV_PRUNING)
}

fn get_next_chunk_pruning_time(db: &Arc<dyn KeyValueDB>) -> Option<NextChunkPruning> {
	query_inner(db, columns::META, &NEXT_CHUNK_PRUNING)
}

async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context)
477
478
479
480
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
	loop {
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
		let res = run_iteration(&mut subsystem, &mut ctx).await;
		match res {
			Err(e) => {
				log::log!(target: LOG_TARGET, e.severity(), "{}", e);
			}
			Ok(true) => {
				log::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
				break;
			},
			Ok(false) => continue,
		}
	}
}

async fn run_iteration<Context>(subsystem: &mut AvailabilityStoreSubsystem, ctx: &mut Context)
	-> Result<bool, Error>
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
	// Every time the following two methods are called a read from DB is performed.
	// But given that these are very small values which are essentially a newtype
	// wrappers around `Duration` (`NextChunkPruning` and `NextPoVPruning`) and also the
	// fact of the frequent reads itself we assume these to end up cached in the memory
	// anyway and thus these db reads to be reasonably fast.
	let pov_pruning_time = subsystem.maybe_prune_povs()?;
	let chunk_pruning_time = subsystem.maybe_prune_chunks()?;

	let mut pov_pruning_time = pov_pruning_time.fuse();
	let mut chunk_pruning_time = chunk_pruning_time.fuse();

	select! {
		incoming = ctx.recv().fuse() => {
			match incoming? {
				FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(true),
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(
					ActiveLeavesUpdate { activated, .. })
				) => {
					for activated in activated.into_iter() {
						process_block_activated(ctx, &subsystem.inner, activated).await?;
520
					}
521
522
523
524
525
526
				}
				FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => {
					process_block_finalized(subsystem, ctx, &subsystem.inner, hash).await?;
				}
				FromOverseer::Communication { msg } => {
					process_message(subsystem, ctx, msg).await?;
527
528
529
				}
			}
		}
530
531
532
533
534
535
536
		pov_pruning_time = pov_pruning_time => {
			subsystem.prune_povs()?;
		}
		chunk_pruning_time = chunk_pruning_time => {
			subsystem.prune_chunks()?;
		}
		complete => return Ok(true),
537
538
	}

539
	Ok(false)
540
541
}

542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
/// As soon as certain block is finalized its pruning records and records of all
/// blocks that we keep that are `older` than the block in question have to be updated.
///
/// The state of data has to be changed from
/// `CandidateState::Included` to `CandidateState::Finalized` and their pruning times have
/// to be updated to `now` + keep_finalized_{block, chunk}_for`.
async fn process_block_finalized<Context>(
	subsystem: &AvailabilityStoreSubsystem,
	ctx: &mut Context,
	db: &Arc<dyn KeyValueDB>,
	hash: Hash,
) -> Result<(), Error>
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
	let block_number = get_block_number(ctx, hash).await?;

	if let Some(mut pov_pruning) = pov_pruning(db) {
		// Since the records are sorted by time in which they need to be pruned and not by block
		// numbers we have to iterate through the whole collection here.
		for record in pov_pruning.iter_mut() {
			if record.block_number <= block_number {
				log::trace!(
					target: LOG_TARGET,
					"Updating pruning record for finalized block {}",
567
					record.block_number,
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
				);

				record.prune_at = PruningDelay::into_the_future(
					subsystem.pruning_config.keep_finalized_block_for
				)?;
				record.candidate_state = CandidateState::Finalized;
			}
		}

		put_pov_pruning(db, None, pov_pruning)?;
	}

	if let Some(mut chunk_pruning) = chunk_pruning(db) {
		for record in chunk_pruning.iter_mut() {
			if record.block_number <= block_number {
				log::trace!(
					target: LOG_TARGET,
					"Updating chunk pruning record for finalized block {}",
586
					record.block_number,
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
				);

				record.prune_at = PruningDelay::into_the_future(
					subsystem.pruning_config.keep_finalized_chunk_for
				)?;
				record.candidate_state = CandidateState::Finalized;
			}
		}

		put_chunk_pruning(db, None, chunk_pruning)?;
	}

	Ok(())
}

async fn process_block_activated<Context>(
	ctx: &mut Context,
	db: &Arc<dyn KeyValueDB>,
	hash: Hash,
) -> Result<(), Error>
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
610
611
612
613
614
615
616
	let events = match request_candidate_events(ctx, hash).await {
		Ok(events) => events,
		Err(err) => {
			log::debug!(target: LOG_TARGET, "requesting candidate events failed due to {}", err);
			return Ok(());
		}
	};
617
618
619
620
621
622

	log::trace!(target: LOG_TARGET, "block activated {}", hash);
	let mut included = HashSet::new();

	for event in events.into_iter() {
		if let CandidateEvent::CandidateIncluded(receipt, _) = event {
623
			log::trace!(target: LOG_TARGET, "Candidate {:?} was included", receipt.hash());
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
			included.insert(receipt.hash());
		}
	}

	if let Some(mut pov_pruning) = pov_pruning(db) {
		for record in pov_pruning.iter_mut() {
			if included.contains(&record.candidate_hash) {
				record.prune_at = PruningDelay::Indefinite;
				record.candidate_state = CandidateState::Included;
			}
		}

		pov_pruning.sort();

		put_pov_pruning(db, None, pov_pruning)?;
	}

	if let Some(mut chunk_pruning) = chunk_pruning(db) {
		for record in chunk_pruning.iter_mut() {
			if included.contains(&record.candidate_hash) {
				record.prune_at = PruningDelay::Indefinite;
				record.candidate_state = CandidateState::Included;
			}
		}

		chunk_pruning.sort();

		put_chunk_pruning(db, None, chunk_pruning)?;
	}

	Ok(())
}

async fn request_candidate_events<Context>(
	ctx: &mut Context,
	hash: Hash,
660
) -> Result<Vec<CandidateEvent>, Error>
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
	let (tx, rx) = oneshot::channel();

	let msg = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
		hash,
		RuntimeApiRequest::CandidateEvents(tx),
	));

	ctx.send_message(msg.into()).await?;

	Ok(rx.await??)
}

async fn process_message<Context>(
	subsystem: &mut AvailabilityStoreSubsystem,
	ctx: &mut Context,
	msg: AvailabilityStoreMessage,
) -> Result<(), Error>
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
684
	use AvailabilityStoreMessage::*;
685

686
687
	match msg {
		QueryAvailableData(hash, tx) => {
688
			tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data))
689
				.map_err(|_| oneshot::Canceled)?;
690
691
		}
		QueryDataAvailability(hash, tx) => {
692
			tx.send(available_data(&subsystem.inner, &hash).is_some())
693
				.map_err(|_| oneshot::Canceled)?;
694
695
		}
		QueryChunk(hash, id, tx) => {
696
			tx.send(get_chunk(subsystem, &hash, id)?)
697
				.map_err(|_| oneshot::Canceled)?;
698
		}
699
		QueryChunkAvailability(hash, id, tx) => {
700
			tx.send(get_chunk(subsystem, &hash, id)?.is_some())
701
				.map_err(|_| oneshot::Canceled)?;
702
		}
703
704
705
706
		StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => {
			// Current block number is relay_parent block number + 1.
			let block_number = get_block_number(ctx, relay_parent).await? + 1;
			match store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number) {
707
				Err(e) => {
708
					tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
709
710
711
					return Err(e);
				}
				Ok(()) => {
712
					tx.send(Ok(())).map_err(|_| oneshot::Canceled)?;
713
714
715
716
				}
			}
		}
		StoreAvailableData(hash, id, n_validators, av_data, tx) => {
717
			match store_available_data(subsystem, &hash, id, n_validators, av_data) {
718
				Err(e) => {
719
					tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
720
721
722
					return Err(e);
				}
				Ok(()) => {
723
					tx.send(Ok(())).map_err(|_| oneshot::Canceled)?;
724
725
726
727
728
729
730
731
				}
			}
		}
	}

	Ok(())
}

732
733
734
735
fn available_data(
	db: &Arc<dyn KeyValueDB>,
	candidate_hash: &CandidateHash,
) -> Option<StoredAvailableData> {
736
737
738
	query_inner(db, columns::DATA, &available_data_key(candidate_hash))
}

739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
fn pov_pruning(db: &Arc<dyn KeyValueDB>) -> Option<Vec<PoVPruningRecord>> {
	query_inner(db, columns::META, &POV_PRUNING_KEY)
}

fn chunk_pruning(db: &Arc<dyn KeyValueDB>) -> Option<Vec<ChunkPruningRecord>> {
	query_inner(db, columns::META, &CHUNK_PRUNING_KEY)
}

fn put_pov_pruning(
	db: &Arc<dyn KeyValueDB>,
	tx: Option<DBTransaction>,
	mut pov_pruning: Vec<PoVPruningRecord>,
) -> Result<(), Error> {
	let mut tx = tx.unwrap_or_default();

	pov_pruning.sort();

	tx.put_vec(
		columns::META,
		&POV_PRUNING_KEY,
		pov_pruning.encode(),
	);

	match pov_pruning.get(0) {
		// We want to wake up in case we have some records that are not scheduled to be kept
		// indefinitely (data is included and waiting to move to the finalized state) and so
		// the is at least one value that is not `PruningDelay::Indefinite`.
		Some(PoVPruningRecord { prune_at: PruningDelay::In(prune_at), .. }) => {
			tx.put_vec(
				columns::META,
				&NEXT_POV_PRUNING,
				NextPoVPruning(*prune_at).encode(),
			);
		}
		_ => {
			// If there is no longer any records, delete the cached pruning time record.
			tx.delete(
				columns::META,
				&NEXT_POV_PRUNING,
			);
		}
	}

	db.write(tx)?;

	Ok(())
}

fn put_chunk_pruning(
788
	db: &Arc<dyn KeyValueDB>,
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
	tx: Option<DBTransaction>,
	mut chunk_pruning: Vec<ChunkPruningRecord>,
) -> Result<(), Error> {
	let mut tx = tx.unwrap_or_default();

	chunk_pruning.sort();

	tx.put_vec(
		columns::META,
		&CHUNK_PRUNING_KEY,
		chunk_pruning.encode(),
	);

	match chunk_pruning.get(0) {
		Some(ChunkPruningRecord { prune_at: PruningDelay::In(prune_at), .. }) => {
			tx.put_vec(
				columns::META,
				&NEXT_CHUNK_PRUNING,
				NextChunkPruning(*prune_at).encode(),
			);
		}
		_ => {
			tx.delete(
				columns::META,
				&NEXT_CHUNK_PRUNING,
			);
		}
	}

	db.write(tx)?;

	Ok(())
}

// produces a block number by block's hash.
// in the the event of an invalid `block_hash`, returns `Ok(0)`
async fn get_block_number<Context>(
	ctx: &mut Context,
	block_hash: Hash,
) -> Result<BlockNumber, Error>
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
	let (tx, rx) = oneshot::channel();

	ctx.send_message(AllMessages::ChainApi(ChainApiMessage::BlockNumber(block_hash, tx))).await?;

	Ok(rx.await??.map(|number| number).unwrap_or_default())
}

fn store_available_data(
	subsystem: &mut AvailabilityStoreSubsystem,
841
	candidate_hash: &CandidateHash,
842
843
844
845
846
847
	id: Option<ValidatorIndex>,
	n_validators: u32,
	available_data: AvailableData,
) -> Result<(), Error> {
	let mut tx = DBTransaction::new();

848
849
	let block_number = available_data.validation_data.block_number;

850
	if let Some(index) = id {
851
852
853
854
855
856
857
858
		let chunks = get_chunks(&available_data, n_validators as usize, &subsystem.metrics)?;
		store_chunk(
			subsystem,
			candidate_hash,
			n_validators,
			chunks[index as usize].clone(),
			block_number,
		)?;
859
860
861
862
863
864
865
	}

	let stored_data = StoredAvailableData {
		data: available_data,
		n_validators,
	};

866
867
868
869
870
871
872
873
874
875
876
877
	let mut pov_pruning = pov_pruning(&subsystem.inner).unwrap_or_default();
	let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?;

	if let Some(next_pruning) = prune_at.as_duration() {
		tx.put_vec(
			columns::META,
			&NEXT_POV_PRUNING,
			NextPoVPruning(next_pruning).encode(),
		);
	}

	let pruning_record = PoVPruningRecord {
878
		candidate_hash: *candidate_hash,
879
880
881
882
883
884
885
886
887
		block_number,
		candidate_state: CandidateState::Stored,
		prune_at,
	};

	let idx = pov_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx);

	pov_pruning.insert(idx, pruning_record);

888
889
890
891
892
893
	tx.put_vec(
		columns::DATA,
		available_data_key(&candidate_hash).as_slice(),
		stored_data.encode(),
	);

894
895
896
897
898
899
900
	tx.put_vec(
		columns::META,
		&POV_PRUNING_KEY,
		pov_pruning.encode(),
	);

	subsystem.inner.write(tx)?;
901
902
903
904

	Ok(())
}

905
906
fn store_chunk(
	subsystem: &mut AvailabilityStoreSubsystem,
907
	candidate_hash: &CandidateHash,
908
909
910
911
	_n_validators: u32,
	chunk: ErasureChunk,
	block_number: BlockNumber,
) -> Result<(), Error> {
912
913
914
915
	let mut tx = DBTransaction::new();

	let dbkey = erasure_chunk_key(candidate_hash, chunk.index);

916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
	let mut chunk_pruning = chunk_pruning(&subsystem.inner).unwrap_or_default();
	let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?;

	if let Some(delay) = prune_at.as_duration() {
		tx.put_vec(
			columns::META,
			&NEXT_CHUNK_PRUNING,
			NextChunkPruning(delay).encode(),
		);
	}

	let pruning_record = ChunkPruningRecord {
		candidate_hash: candidate_hash.clone(),
		block_number,
		candidate_state: CandidateState::Stored,
		chunk_index: chunk.index,
		prune_at,
	};

	let idx = chunk_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx);

	chunk_pruning.insert(idx, pruning_record);

	tx.put_vec(
		columns::DATA,
		&dbkey,
		chunk.encode(),
	);

	tx.put_vec(
		columns::META,
		&CHUNK_PRUNING_KEY,
		chunk_pruning.encode(),
	);

	subsystem.inner.write(tx)?;
952
953
954
955

	Ok(())
}

956
957
fn get_chunk(
	subsystem: &mut AvailabilityStoreSubsystem,
958
	candidate_hash: &CandidateHash,
959
960
	index: u32,
) -> Result<Option<ErasureChunk>, Error> {
961
	if let Some(chunk) = query_inner(
962
		&subsystem.inner,
963
		columns::DATA,
964
965
		&erasure_chunk_key(candidate_hash, index)
	) {
966
967
968
		return Ok(Some(chunk));
	}

969
970
	if let Some(data) = available_data(&subsystem.inner, candidate_hash) {
		let mut chunks = get_chunks(&data.data, data.n_validators as usize, &subsystem.metrics)?;
971
972
		let desired_chunk = chunks.get(index as usize).cloned();
		for chunk in chunks.drain(..) {
973
974
975
976
977
978
979
			store_chunk(
				subsystem,
				candidate_hash,
				data.n_validators,
				chunk,
				data.data.validation_data.block_number,
			)?;
980
981
982
983
984
985
986
		}
		return Ok(desired_chunk);
	}

	Ok(None)
}

987
988
989
990
991
fn query_inner<D: Decode>(
	db: &Arc<dyn KeyValueDB>,
	column: u32,
	key: &[u8],
) -> Option<D> {
992
993
994
995
996
997
998
999
1000
	match db.get(column, key) {
		Ok(Some(raw)) => {
			let res = D::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed");
			Some(res)
		}
		Ok(None) => None,
		Err(e) => {
			log::warn!(target: LOG_TARGET, "Error reading from the availability store: {:?}", e);
			None
For faster browsing, not all history is shown. View entire blame