lib.rs 34.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements a `AvailabilityStoreSubsystem`.

#![recursion_limit="256"]
#![warn(missing_docs)]

22
23
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
24
25
26
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
27
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
28

29
use parity_scale_codec::{Encode, Decode};
30
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt};
31
use futures_timer::Delay;
32
33
34
35
use kvdb_rocksdb::{Database, DatabaseConfig};
use kvdb::{KeyValueDB, DBTransaction};

use polkadot_primitives::v1::{
36
	Hash, AvailableData, BlockNumber, CandidateEvent, ErasureChunk, ValidatorIndex, CandidateHash,
37
38
};
use polkadot_subsystem::{
39
40
41
	FromOverseer, OverseerSignal, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem,
	ActiveLeavesUpdate,
	errors::{ChainApiError, RuntimeApiError},
42
};
43
44
45
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_subsystem::messages::{
	AllMessages, AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest,
46
47
48
49
50
51
};

const LOG_TARGET: &str = "availability";

mod columns {
	pub const DATA: u32 = 0;
52
53
	pub const META: u32 = 1;
	pub const NUM_COLUMNS: u32 = 2;
54
55
}

56
57
58
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
59
	#[error(transparent)]
60
	RuntimeApi(#[from] RuntimeApiError),
61

62
63
	#[error(transparent)]
	ChainApi(#[from] ChainApiError),
64

65
66
	#[error(transparent)]
	Erasure(#[from] erasure::Error),
67

68
69
	#[error(transparent)]
	Io(#[from] io::Error),
70

71
	#[error(transparent)]
72
	Oneshot(#[from] oneshot::Canceled),
73

74
75
	#[error(transparent)]
	Subsystem(#[from] SubsystemError),
76

77
78
	#[error(transparent)]
	Time(#[from] SystemTimeError),
79
80
81

	#[error("Custom databases are not supported")]
	CustomDatabase,
82
83
}

84
impl Error {
85
	fn trace(&self) {
86
87
88
		match self {
			// don't spam the log with spurious errors
			Self::RuntimeApi(_) |
89
			Self::Oneshot(_) => tracing::debug!(target: LOG_TARGET, err = ?self),
90
			// it's worth reporting otherwise
91
			_ => tracing::warn!(target: LOG_TARGET, err = ?self),
92
93
		}
	}
94
95
}

96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/// A wrapper type for delays.
#[derive(Debug, Decode, Encode, Eq)]
enum PruningDelay {
	/// This pruning should be triggered after this `Duration` from UNIX_EPOCH.
	In(Duration),

	/// Data is in the state where it has no expiration.
	Indefinite,
}

impl PruningDelay {
	fn now() -> Result<Self, Error> {
		Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.into())
	}

	fn into_the_future(duration: Duration) -> Result<Self, Error> {
		Ok(Self::In(SystemTime::now().duration_since(UNIX_EPOCH)? + duration))
	}

	fn as_duration(&self) -> Option<Duration> {
		match self {
117
118
			PruningDelay::In(d) => Some(*d),
			PruningDelay::Indefinite => None,
119
120
121
122
123
		}
	}
}

impl From<Duration> for PruningDelay {
124
	fn from(d: Duration) -> Self {
125
		Self::In(d)
126
	}
127
128
129
}

impl PartialEq for PruningDelay {
130
	fn eq(&self, other: &Self) -> bool {
131
		match (self, other) {
132
133
			(PruningDelay::In(this), PruningDelay::In(that)) => {this == that},
			(PruningDelay::Indefinite, PruningDelay::Indefinite) => true,
134
135
			_ => false,
		}
136
	}
137
138
139
}

impl PartialOrd for PruningDelay {
140
	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
141
		match (self, other) {
142
143
144
145
			(PruningDelay::In(this), PruningDelay::In(that)) => this.partial_cmp(that),
			(PruningDelay::In(_), PruningDelay::Indefinite) => Some(Ordering::Less),
			(PruningDelay::Indefinite, PruningDelay::In(_)) => Some(Ordering::Greater),
			(PruningDelay::Indefinite, PruningDelay::Indefinite) => Some(Ordering::Equal),
146
		}
147
	}
148
149
150
}

impl Ord for PruningDelay {
151
	fn cmp(&self, other: &Self) -> Ordering {
152
		match (self, other) {
153
154
155
156
			(PruningDelay::In(this), PruningDelay::In(that)) => this.cmp(that),
			(PruningDelay::In(_), PruningDelay::Indefinite) => Ordering::Less,
			(PruningDelay::Indefinite, PruningDelay::In(_)) => Ordering::Greater,
			(PruningDelay::Indefinite, PruningDelay::Indefinite) => Ordering::Equal,
157
		}
158
	}
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
}

/// A key for chunk pruning records.
const CHUNK_PRUNING_KEY: [u8; 14] = *b"chunks_pruning";

/// A key for PoV pruning records.
const POV_PRUNING_KEY: [u8; 11] = *b"pov_pruning";

/// A key for a cached value of next scheduled PoV pruning.
const NEXT_POV_PRUNING: [u8; 16] = *b"next_pov_pruning";

/// A key for a cached value of next scheduled chunk pruning.
const NEXT_CHUNK_PRUNING: [u8; 18] = *b"next_chunk_pruning";

/// The following constants are used under normal conditions:

/// Stored block is kept available for 1 hour.
const KEEP_STORED_BLOCK_FOR: Duration = Duration::from_secs(60 * 60);

/// Finalized block is kept for 1 day.
const KEEP_FINALIZED_BLOCK_FOR: Duration = Duration::from_secs(24 * 60 * 60);

/// Keep chunk of the finalized block for 1 day + 1 hour.
const KEEP_FINALIZED_CHUNK_FOR: Duration = Duration::from_secs(25 * 60 * 60);

/// At which point in time since UNIX_EPOCH we need to wakeup and do next pruning of blocks.
/// Essenially this is the first element in the sorted array of pruning data,
/// we just want to cache it here to avoid lifting the whole array just to look at the head.
///
/// This record exists under `NEXT_POV_PRUNING` key, if it does not either:
///  a) There are no records and nothing has to be pruned.
///  b) There are records but all of them are in `Included` state and do not have exact time to
///     be pruned.
#[derive(Decode, Encode)]
struct NextPoVPruning(Duration);

impl NextPoVPruning {
	// After which duration from `now` this should fire.
	fn should_fire_in(&self) -> Result<Duration, Error> {
198
		Ok(self.0.checked_sub(SystemTime::now().duration_since(UNIX_EPOCH)?).unwrap_or_default())
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
	}
}

/// At which point in time since UNIX_EPOCH we need to wakeup and do next pruning of chunks.
/// Essentially this is the first element in the sorted array of pruning data,
/// we just want to cache it here to avoid lifting the whole array just to look at the head.
///
/// This record exists under `NEXT_CHUNK_PRUNING` key, if it does not either:
///  a) There are no records and nothing has to be pruned.
///  b) There are records but all of them are in `Included` state and do not have exact time to
///     be pruned.
#[derive(Decode, Encode)]
struct NextChunkPruning(Duration);

impl NextChunkPruning {
	// After which amount of seconds into the future from `now` this should fire.
	fn should_fire_in(&self) -> Result<Duration, Error> {
216
		Ok(self.0.checked_sub(SystemTime::now().duration_since(UNIX_EPOCH)?).unwrap_or_default())
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
	}
}

/// Struct holding pruning timing configuration.
/// The only purpose of this structure is to use different timing
/// configurations in production and in testing.
#[derive(Clone)]
struct PruningConfig {
	/// How long should a stored block stay available.
	keep_stored_block_for: Duration,

	/// How long should a finalized block stay available.
	keep_finalized_block_for: Duration,

	/// How long should a chunk of a finalized block stay available.
	keep_finalized_chunk_for: Duration,
}

impl Default for PruningConfig {
236
	fn default() -> Self {
237
238
239
240
241
		Self {
			keep_stored_block_for: KEEP_STORED_BLOCK_FOR,
			keep_finalized_block_for: KEEP_FINALIZED_BLOCK_FOR,
			keep_finalized_chunk_for: KEEP_FINALIZED_CHUNK_FOR,
		}
242
	}
243
244
245
246
247
248
249
250
251
252
253
}

#[derive(Debug, Decode, Encode, Eq, PartialEq)]
enum CandidateState {
	Stored,
	Included,
	Finalized,
}

#[derive(Debug, Decode, Encode, Eq)]
struct PoVPruningRecord {
254
	candidate_hash: CandidateHash,
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
	block_number: BlockNumber,
	candidate_state: CandidateState,
	prune_at: PruningDelay,
}

impl PartialEq for PoVPruningRecord {
	fn eq(&self, other: &Self) -> bool {
		self.candidate_hash == other.candidate_hash
	}
}

impl Ord for PoVPruningRecord {
	fn cmp(&self, other: &Self) -> Ordering {
		if self.candidate_hash == other.candidate_hash {
			return Ordering::Equal;
		}

		self.prune_at.cmp(&other.prune_at)
	}
}

impl PartialOrd for PoVPruningRecord {
277
	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
278
		Some(self.cmp(other))
279
	}
280
281
282
283
}

#[derive(Debug, Decode, Encode, Eq)]
struct ChunkPruningRecord {
284
	candidate_hash: CandidateHash,
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
	block_number: BlockNumber,
	candidate_state: CandidateState,
	chunk_index: u32,
	prune_at: PruningDelay,
}

impl PartialEq for ChunkPruningRecord {
	fn eq(&self, other: &Self) -> bool {
		self.candidate_hash == other.candidate_hash &&
			self.chunk_index == other.chunk_index
	}
}

impl Ord for ChunkPruningRecord {
	fn cmp(&self, other: &Self) -> Ordering {
		if self.candidate_hash == other.candidate_hash {
			return Ordering::Equal;
		}

		self.prune_at.cmp(&other.prune_at)
	}
}

impl PartialOrd for ChunkPruningRecord {
	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
		Some(self.cmp(other))
	}
312
313
314
315
}

/// An implementation of the Availability Store subsystem.
pub struct AvailabilityStoreSubsystem {
316
	pruning_config: PruningConfig,
317
	inner: Arc<dyn KeyValueDB>,
318
	metrics: Metrics,
319
320
}

321
322
impl AvailabilityStoreSubsystem {
	// Perform pruning of PoVs
323
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
324
	fn prune_povs(&self) -> Result<(), Error> {
325
326
		let _timer = self.metrics.time_prune_povs();

327
328
329
330
		let mut tx = DBTransaction::new();
		let mut pov_pruning = pov_pruning(&self.inner).unwrap_or_default();
		let now = PruningDelay::now()?;

331
		tracing::trace!(target: LOG_TARGET, "Pruning PoVs");
332
333
334
335
336
		let outdated_records_count = pov_pruning.iter()
			.take_while(|r| r.prune_at <= now)
			.count();

		for record in pov_pruning.drain(..outdated_records_count) {
337
			tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record");
338
339
340
341
342
343
344
345
346
347
348
349
			tx.delete(
				columns::DATA,
				available_data_key(&record.candidate_hash).as_slice(),
			);
		}

		put_pov_pruning(&self.inner, Some(tx), pov_pruning)?;

		Ok(())
	}

	// Perform pruning of chunks.
350
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
351
	fn prune_chunks(&self) -> Result<(), Error> {
352
353
		let _timer = self.metrics.time_prune_chunks();

354
355
356
357
		let mut tx = DBTransaction::new();
		let mut chunk_pruning = chunk_pruning(&self.inner).unwrap_or_default();
		let now = PruningDelay::now()?;

358
		tracing::trace!(target: LOG_TARGET, "Pruning Chunks");
359
360
361
362
363
		let outdated_records_count = chunk_pruning.iter()
			.take_while(|r| r.prune_at <= now)
			.count();

		for record in chunk_pruning.drain(..outdated_records_count) {
364
			tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record");
365
366
367
368
369
370
371
372
373
374
375
376
377
378
			tx.delete(
				columns::DATA,
				erasure_chunk_key(&record.candidate_hash, record.chunk_index).as_slice(),
			);
		}

		put_chunk_pruning(&self.inner, Some(tx), chunk_pruning)?;

		Ok(())
	}

	// Return a `Future` that either resolves when another PoV pruning has to happen
	// or is indefinitely `pending` in case no pruning has to be done.
	// Just a helper to `select` over multiple things at once.
379
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
380
381
382
383
384
385
386
387
388
389
390
391
392
393
	fn maybe_prune_povs(&self) -> Result<impl Future<Output = ()>, Error> {
		let future = match get_next_pov_pruning_time(&self.inner) {
			Some(pruning) => {
				Either::Left(Delay::new(pruning.should_fire_in()?))
			}
			None => Either::Right(future::pending::<()>()),
		};

		Ok(future)
	}

	// Return a `Future` that either resolves when another chunk pruning has to happen
	// or is indefinitely `pending` in case no pruning has to be done.
	// Just a helper to `select` over multiple things at once.
394
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
395
396
397
398
399
400
401
402
403
404
405
406
	fn maybe_prune_chunks(&self) -> Result<impl Future<Output = ()>, Error> {
		let future = match get_next_chunk_pruning_time(&self.inner) {
			Some(pruning) => {
				Either::Left(Delay::new(pruning.should_fire_in()?))
			}
			None => Either::Right(future::pending::<()>()),
		};

		Ok(future)
	}
}

407
fn available_data_key(candidate_hash: &CandidateHash) -> Vec<u8> {
408
409
410
	(candidate_hash, 0i8).encode()
}

411
fn erasure_chunk_key(candidate_hash: &CandidateHash, index: u32) -> Vec<u8> {
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
	(candidate_hash, index, 0i8).encode()
}

#[derive(Encode, Decode)]
struct StoredAvailableData {
	data: AvailableData,
	n_validators: u32,
}

/// Configuration for the availability store.
pub struct Config {
	/// Total cache size in megabytes. If `None` the default (128 MiB per column) is used.
	pub cache_size: Option<usize>,
	/// Path to the database.
	pub path: PathBuf,
}

429
impl std::convert::TryFrom<sc_service::config::DatabaseConfig> for Config {
430
	type Error = Error;
431
432

	fn try_from(config: sc_service::config::DatabaseConfig) -> Result<Self, Self::Error> {
433
		let path = config.path().ok_or(Error::CustomDatabase)?;
434
435
436
437
438
439
440
441
442
443
444
445

		Ok(Self {
			// substrate cache size is improper here; just use the default
			cache_size: None,
			// DB path is a sub-directory of substrate db path to give two properties:
			// 1: column numbers don't conflict with substrate
			// 2: commands like purge-chain work without further changes
			path: path.join("parachains").join("av-store"),
		})
	}
}

446
447
impl AvailabilityStoreSubsystem {
	/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
448
	pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result<Self> {
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
		let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS);

		if let Some(cache_size) = config.cache_size {
			let mut memory_budget = HashMap::new();

			for i in 0..columns::NUM_COLUMNS {
				memory_budget.insert(i, cache_size / columns::NUM_COLUMNS as usize);
			}
			db_config.memory_budget = memory_budget;
		}

		let path = config.path.to_str().ok_or_else(|| io::Error::new(
			io::ErrorKind::Other,
			format!("Bad database path: {:?}", config.path),
		))?;

465
		std::fs::create_dir_all(&path)?;
466
467
468
		let db = Database::open(&db_config, &path)?;

		Ok(Self {
469
			pruning_config: PruningConfig::default(),
470
			inner: Arc::new(db),
471
			metrics,
472
473
474
475
		})
	}

	#[cfg(test)]
476
	fn new_in_memory(inner: Arc<dyn KeyValueDB>, pruning_config: PruningConfig) -> Self {
477
		Self {
478
			pruning_config,
479
			inner,
480
			metrics: Metrics(None),
481
482
483
484
		}
	}
}

485
486
487
488
489
490
491
492
fn get_next_pov_pruning_time(db: &Arc<dyn KeyValueDB>) -> Option<NextPoVPruning> {
	query_inner(db, columns::META, &NEXT_POV_PRUNING)
}

fn get_next_chunk_pruning_time(db: &Arc<dyn KeyValueDB>) -> Option<NextChunkPruning> {
	query_inner(db, columns::META, &NEXT_CHUNK_PRUNING)
}

493
#[tracing::instrument(skip(subsystem, ctx), fields(subsystem = LOG_TARGET))]
494
async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context)
495
496
497
498
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
	loop {
499
500
501
		let res = run_iteration(&mut subsystem, &mut ctx).await;
		match res {
			Err(e) => {
502
				e.trace();
503
504
			}
			Ok(true) => {
505
				tracing::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
506
507
508
509
510
511
512
				break;
			},
			Ok(false) => continue,
		}
	}
}

513
#[tracing::instrument(level = "trace", skip(subsystem, ctx), fields(subsystem = LOG_TARGET))]
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
async fn run_iteration<Context>(subsystem: &mut AvailabilityStoreSubsystem, ctx: &mut Context)
	-> Result<bool, Error>
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
	// Every time the following two methods are called a read from DB is performed.
	// But given that these are very small values which are essentially a newtype
	// wrappers around `Duration` (`NextChunkPruning` and `NextPoVPruning`) and also the
	// fact of the frequent reads itself we assume these to end up cached in the memory
	// anyway and thus these db reads to be reasonably fast.
	let pov_pruning_time = subsystem.maybe_prune_povs()?;
	let chunk_pruning_time = subsystem.maybe_prune_chunks()?;

	let mut pov_pruning_time = pov_pruning_time.fuse();
	let mut chunk_pruning_time = chunk_pruning_time.fuse();

	select! {
		incoming = ctx.recv().fuse() => {
			match incoming? {
				FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(true),
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(
					ActiveLeavesUpdate { activated, .. })
				) => {
					for activated in activated.into_iter() {
538
						process_block_activated(ctx, &subsystem.inner, activated, &subsystem.metrics).await?;
539
					}
540
				}
541
542
				FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
					process_block_finalized(subsystem, &subsystem.inner, number).await?;
543
544
545
				}
				FromOverseer::Communication { msg } => {
					process_message(subsystem, ctx, msg).await?;
546
547
548
				}
			}
		}
549
		_ = pov_pruning_time => {
550
551
			subsystem.prune_povs()?;
		}
552
		_ = chunk_pruning_time => {
553
554
555
			subsystem.prune_chunks()?;
		}
		complete => return Ok(true),
556
557
	}

558
	Ok(false)
559
560
}

561
562
563
564
565
566
/// As soon as certain block is finalized its pruning records and records of all
/// blocks that we keep that are `older` than the block in question have to be updated.
///
/// The state of data has to be changed from
/// `CandidateState::Included` to `CandidateState::Finalized` and their pruning times have
/// to be updated to `now` + keep_finalized_{block, chunk}_for`.
567
568
#[tracing::instrument(level = "trace", skip(subsystem, db), fields(subsystem = LOG_TARGET))]
async fn process_block_finalized(
569
570
	subsystem: &AvailabilityStoreSubsystem,
	db: &Arc<dyn KeyValueDB>,
571
572
	block_number: BlockNumber,
) -> Result<(), Error> {
573
574
	let _timer = subsystem.metrics.time_process_block_finalized();

575
576
577
578
579
	if let Some(mut pov_pruning) = pov_pruning(db) {
		// Since the records are sorted by time in which they need to be pruned and not by block
		// numbers we have to iterate through the whole collection here.
		for record in pov_pruning.iter_mut() {
			if record.block_number <= block_number {
580
				tracing::trace!(
581
					target: LOG_TARGET,
582
583
					block_number = %record.block_number,
					"Updating pruning record for finalized block",
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
				);

				record.prune_at = PruningDelay::into_the_future(
					subsystem.pruning_config.keep_finalized_block_for
				)?;
				record.candidate_state = CandidateState::Finalized;
			}
		}

		put_pov_pruning(db, None, pov_pruning)?;
	}

	if let Some(mut chunk_pruning) = chunk_pruning(db) {
		for record in chunk_pruning.iter_mut() {
			if record.block_number <= block_number {
599
				tracing::trace!(
600
					target: LOG_TARGET,
601
602
					block_number = %record.block_number,
					"Updating chunk pruning record for finalized block",
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
				);

				record.prune_at = PruningDelay::into_the_future(
					subsystem.pruning_config.keep_finalized_chunk_for
				)?;
				record.candidate_state = CandidateState::Finalized;
			}
		}

		put_chunk_pruning(db, None, chunk_pruning)?;
	}

	Ok(())
}

618
#[tracing::instrument(level = "trace", skip(ctx, db, metrics), fields(subsystem = LOG_TARGET))]
619
620
621
622
async fn process_block_activated<Context>(
	ctx: &mut Context,
	db: &Arc<dyn KeyValueDB>,
	hash: Hash,
623
	metrics: &Metrics,
624
625
626
627
) -> Result<(), Error>
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
628
629
	let _timer = metrics.time_block_activated();

630
631
632
	let events = match request_candidate_events(ctx, hash).await {
		Ok(events) => events,
		Err(err) => {
633
			tracing::debug!(target: LOG_TARGET, err = ?err, "requesting candidate events failed");
634
635
636
			return Ok(());
		}
	};
637

638
	tracing::trace!(target: LOG_TARGET, hash = %hash, "block activated");
639
640
641
642
	let mut included = HashSet::new();

	for event in events.into_iter() {
		if let CandidateEvent::CandidateIncluded(receipt, _) = event {
643
644
645
646
647
			tracing::trace!(
				target: LOG_TARGET,
				hash = %receipt.hash(),
				"Candidate {:?} was included", receipt.hash(),
			);
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
			included.insert(receipt.hash());
		}
	}

	if let Some(mut pov_pruning) = pov_pruning(db) {
		for record in pov_pruning.iter_mut() {
			if included.contains(&record.candidate_hash) {
				record.prune_at = PruningDelay::Indefinite;
				record.candidate_state = CandidateState::Included;
			}
		}

		pov_pruning.sort();

		put_pov_pruning(db, None, pov_pruning)?;
	}

	if let Some(mut chunk_pruning) = chunk_pruning(db) {
		for record in chunk_pruning.iter_mut() {
			if included.contains(&record.candidate_hash) {
				record.prune_at = PruningDelay::Indefinite;
				record.candidate_state = CandidateState::Included;
			}
		}

		chunk_pruning.sort();

		put_chunk_pruning(db, None, chunk_pruning)?;
	}

	Ok(())
}

681
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
682
683
684
async fn request_candidate_events<Context>(
	ctx: &mut Context,
	hash: Hash,
685
) -> Result<Vec<CandidateEvent>, Error>
686
687
688
689
690
691
692
693
694
695
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
	let (tx, rx) = oneshot::channel();

	let msg = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
		hash,
		RuntimeApiRequest::CandidateEvents(tx),
	));

696
	ctx.send_message(msg.into()).await;
697
698
699
700

	Ok(rx.await??)
}

701
#[tracing::instrument(level = "trace", skip(subsystem, ctx), fields(subsystem = LOG_TARGET))]
702
703
704
705
706
707
708
709
async fn process_message<Context>(
	subsystem: &mut AvailabilityStoreSubsystem,
	ctx: &mut Context,
	msg: AvailabilityStoreMessage,
) -> Result<(), Error>
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
710
	use AvailabilityStoreMessage::*;
711

712
713
	let _timer = subsystem.metrics.time_process_message();

714
715
	match msg {
		QueryAvailableData(hash, tx) => {
716
			tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data))
717
				.map_err(|_| oneshot::Canceled)?;
718
719
		}
		QueryDataAvailability(hash, tx) => {
720
			tx.send(available_data(&subsystem.inner, &hash).is_some())
721
				.map_err(|_| oneshot::Canceled)?;
722
723
		}
		QueryChunk(hash, id, tx) => {
724
			tx.send(get_chunk(subsystem, &hash, id)?)
725
				.map_err(|_| oneshot::Canceled)?;
726
		}
727
		QueryChunkAvailability(hash, id, tx) => {
728
			tx.send(get_chunk(subsystem, &hash, id)?.is_some())
729
				.map_err(|_| oneshot::Canceled)?;
730
		}
731
732
733
734
		StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => {
			// Current block number is relay_parent block number + 1.
			let block_number = get_block_number(ctx, relay_parent).await? + 1;
			match store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number) {
735
				Err(e) => {
736
					tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
737
738
739
					return Err(e);
				}
				Ok(()) => {
740
					tx.send(Ok(())).map_err(|_| oneshot::Canceled)?;
741
742
743
744
				}
			}
		}
		StoreAvailableData(hash, id, n_validators, av_data, tx) => {
745
			match store_available_data(subsystem, &hash, id, n_validators, av_data) {
746
				Err(e) => {
747
					tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
748
749
750
					return Err(e);
				}
				Ok(()) => {
751
					tx.send(Ok(())).map_err(|_| oneshot::Canceled)?;
752
753
754
755
756
757
758
759
				}
			}
		}
	}

	Ok(())
}

760
761
762
763
fn available_data(
	db: &Arc<dyn KeyValueDB>,
	candidate_hash: &CandidateHash,
) -> Option<StoredAvailableData> {
764
765
766
	query_inner(db, columns::DATA, &available_data_key(candidate_hash))
}

767
768
769
770
771
772
773
774
fn pov_pruning(db: &Arc<dyn KeyValueDB>) -> Option<Vec<PoVPruningRecord>> {
	query_inner(db, columns::META, &POV_PRUNING_KEY)
}

fn chunk_pruning(db: &Arc<dyn KeyValueDB>) -> Option<Vec<ChunkPruningRecord>> {
	query_inner(db, columns::META, &CHUNK_PRUNING_KEY)
}

775
#[tracing::instrument(level = "trace", skip(db, tx), fields(subsystem = LOG_TARGET))]
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
fn put_pov_pruning(
	db: &Arc<dyn KeyValueDB>,
	tx: Option<DBTransaction>,
	mut pov_pruning: Vec<PoVPruningRecord>,
) -> Result<(), Error> {
	let mut tx = tx.unwrap_or_default();

	pov_pruning.sort();

	tx.put_vec(
		columns::META,
		&POV_PRUNING_KEY,
		pov_pruning.encode(),
	);

	match pov_pruning.get(0) {
		// We want to wake up in case we have some records that are not scheduled to be kept
		// indefinitely (data is included and waiting to move to the finalized state) and so
		// the is at least one value that is not `PruningDelay::Indefinite`.
		Some(PoVPruningRecord { prune_at: PruningDelay::In(prune_at), .. }) => {
			tx.put_vec(
				columns::META,
				&NEXT_POV_PRUNING,
				NextPoVPruning(*prune_at).encode(),
			);
		}
		_ => {
			// If there is no longer any records, delete the cached pruning time record.
			tx.delete(
				columns::META,
				&NEXT_POV_PRUNING,
			);
		}
	}

	db.write(tx)?;

	Ok(())
}

816
#[tracing::instrument(level = "trace", skip(db, tx), fields(subsystem = LOG_TARGET))]
817
fn put_chunk_pruning(
818
	db: &Arc<dyn KeyValueDB>,
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
	tx: Option<DBTransaction>,
	mut chunk_pruning: Vec<ChunkPruningRecord>,
) -> Result<(), Error> {
	let mut tx = tx.unwrap_or_default();

	chunk_pruning.sort();

	tx.put_vec(
		columns::META,
		&CHUNK_PRUNING_KEY,
		chunk_pruning.encode(),
	);

	match chunk_pruning.get(0) {
		Some(ChunkPruningRecord { prune_at: PruningDelay::In(prune_at), .. }) => {
			tx.put_vec(
				columns::META,
				&NEXT_CHUNK_PRUNING,
				NextChunkPruning(*prune_at).encode(),
			);
		}
		_ => {
			tx.delete(
				columns::META,
				&NEXT_CHUNK_PRUNING,
			);
		}
	}

	db.write(tx)?;

	Ok(())
}

// produces a block number by block's hash.
// in the the event of an invalid `block_hash`, returns `Ok(0)`
async fn get_block_number<Context>(
	ctx: &mut Context,
	block_hash: Hash,
) -> Result<BlockNumber, Error>
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
	let (tx, rx) = oneshot::channel();

864
	ctx.send_message(AllMessages::ChainApi(ChainApiMessage::BlockNumber(block_hash, tx))).await;
865
866
867
868

	Ok(rx.await??.map(|number| number).unwrap_or_default())
}

869
#[tracing::instrument(level = "trace", skip(subsystem, available_data), fields(subsystem = LOG_TARGET))]
870
871
fn store_available_data(
	subsystem: &mut AvailabilityStoreSubsystem,
872
	candidate_hash: &CandidateHash,
873
874
875
876
	id: Option<ValidatorIndex>,
	n_validators: u32,
	available_data: AvailableData,
) -> Result<(), Error> {
877
878
	let _timer = subsystem.metrics.time_store_available_data();

879
880
	let mut tx = DBTransaction::new();

881
882
	let block_number = available_data.validation_data.block_number;

883
	if let Some(index) = id {
884
885
886
887
888
889
890
891
		let chunks = get_chunks(&available_data, n_validators as usize, &subsystem.metrics)?;
		store_chunk(
			subsystem,
			candidate_hash,
			n_validators,
			chunks[index as usize].clone(),
			block_number,
		)?;
892
893
894
895
896
897
898
	}

	let stored_data = StoredAvailableData {
		data: available_data,
		n_validators,
	};

899
900
901
902
903
904
905
906
907
908
909
910
	let mut pov_pruning = pov_pruning(&subsystem.inner).unwrap_or_default();
	let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?;

	if let Some(next_pruning) = prune_at.as_duration() {
		tx.put_vec(
			columns::META,
			&NEXT_POV_PRUNING,
			NextPoVPruning(next_pruning).encode(),
		);
	}

	let pruning_record = PoVPruningRecord {
911
		candidate_hash: *candidate_hash,
912
913
914
915
916
917
918
919
920
		block_number,
		candidate_state: CandidateState::Stored,
		prune_at,
	};

	let idx = pov_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx);

	pov_pruning.insert(idx, pruning_record);

921
922
923
924
925
926
	tx.put_vec(
		columns::DATA,
		available_data_key(&candidate_hash).as_slice(),
		stored_data.encode(),
	);

927
928
929
930
931
932
933
	tx.put_vec(
		columns::META,
		&POV_PRUNING_KEY,
		pov_pruning.encode(),
	);

	subsystem.inner.write(tx)?;
934
935
936
937

	Ok(())
}

938
#[tracing::instrument(level = "trace", skip(subsystem), fields(subsystem = LOG_TARGET))]
939
940
fn store_chunk(
	subsystem: &mut AvailabilityStoreSubsystem,
941
	candidate_hash: &CandidateHash,
942
943
944
945
	_n_validators: u32,
	chunk: ErasureChunk,
	block_number: BlockNumber,
) -> Result<(), Error> {
946
947
	let _timer = subsystem.metrics.time_store_chunk();

948
949
950
951
	let mut tx = DBTransaction::new();

	let dbkey = erasure_chunk_key(candidate_hash, chunk.index);

952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
	let mut chunk_pruning = chunk_pruning(&subsystem.inner).unwrap_or_default();
	let prune_at = PruningDelay::into_the_future(subsystem.pruning_config.keep_stored_block_for)?;

	if let Some(delay) = prune_at.as_duration() {
		tx.put_vec(
			columns::META,
			&NEXT_CHUNK_PRUNING,
			NextChunkPruning(delay).encode(),
		);
	}

	let pruning_record = ChunkPruningRecord {
		candidate_hash: candidate_hash.clone(),
		block_number,
		candidate_state: CandidateState::Stored,
		chunk_index: chunk.index,
		prune_at,
	};

	let idx = chunk_pruning.binary_search(&pruning_record).unwrap_or_else(|insert_idx| insert_idx);

	chunk_pruning.insert(idx, pruning_record);

	tx.put_vec(
		columns::DATA,
		&dbkey,
		chunk.encode(),
	);

	tx.put_vec(
		columns::META,
		&CHUNK_PRUNING_KEY,
		chunk_pruning.encode(),
	);

	subsystem.inner.write(tx)?;
988
989
990
991

	Ok(())
}

992
#[tracing::instrument(level = "trace", skip(subsystem), fields(subsystem = LOG_TARGET))]
993
994
fn get_chunk(
	subsystem: &mut AvailabilityStoreSubsystem,
995
	candidate_hash: &CandidateHash,
996
997
	index: u32,
) -> Result<Option<ErasureChunk>, Error> {
998
999
	let _timer = subsystem.metrics.time_get_chunk();

1000
	if let Some(chunk) = query_inner(
For faster browsing, not all history is shown. View entire blame