lib.rs 35.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements a `AvailabilityStoreSubsystem`.

#![recursion_limit="256"]
#![warn(missing_docs)]

22
use std::collections::HashMap;
23
24
25
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
26
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
27

28
29
use parity_scale_codec::{Encode, Decode, Input, Error as CodecError};
use futures::{select, channel::oneshot, future, FutureExt};
30
use futures_timer::Delay;
31
32
33
34
use kvdb_rocksdb::{Database, DatabaseConfig};
use kvdb::{KeyValueDB, DBTransaction};

use polkadot_primitives::v1::{
35
	Hash, AvailableData, BlockNumber, CandidateEvent, ErasureChunk, ValidatorIndex, CandidateHash,
36
	CandidateReceipt,
37
38
};
use polkadot_subsystem::{
39
40
41
	FromOverseer, OverseerSignal, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem,
	ActiveLeavesUpdate,
	errors::{ChainApiError, RuntimeApiError},
42
};
43
44
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_subsystem::messages::{
45
	AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest,
46
};
47
48
49
50
use bitvec::{vec::BitVec, order::Lsb0 as BitOrderLsb0};

#[cfg(test)]
mod tests;
51
52
53
54
55

const LOG_TARGET: &str = "availability";

mod columns {
	pub const DATA: u32 = 0;
56
57
	pub const META: u32 = 1;
	pub const NUM_COLUMNS: u32 = 2;
58
59
}

60
/// The following constants are used under normal conditions:
61

62
63
64
65
66
const AVAILABLE_PREFIX: &[u8; 9] = b"available";
const CHUNK_PREFIX: &[u8; 5] = b"chunk";
const META_PREFIX: &[u8; 4] = b"meta";
const UNFINALIZED_PREFIX: &[u8; 11] = b"unfinalized";
const PRUNE_BY_TIME_PREFIX: &[u8; 13] = b"prune_by_time";
67

68
69
70
// We have some keys we want to map to empty values because existence of the key is enough. We use this because
// rocksdb doesn't support empty values.
const TOMBSTONE_VALUE: &[u8] = &*b" ";
71

72
73
/// Unavailable blocks are kept for 1 hour.
const KEEP_UNAVAILABLE_FOR: Duration = Duration::from_secs(60 * 60);
74

75
76
/// Finalized data is kept for 25 hours.
const KEEP_FINALIZED_FOR: Duration = Duration::from_secs(25 * 60 * 60);
77

78
79
/// The pruning interval.
const PRUNING_INTERVAL: Duration = Duration::from_secs(60 * 5);
80

81
82
83
/// Unix time wrapper with big-endian encoding.
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
struct BETimestamp(u64);
84

85
86
87
88
impl Encode for BETimestamp {
	fn size_hint(&self) -> usize {
		std::mem::size_of::<u64>()
	}
89

90
91
	fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
		f(&self.0.to_be_bytes())
92
	}
93
94
}

95
96
97
98
99
impl Decode for BETimestamp {
	fn decode<I: Input>(value: &mut I) -> Result<Self, CodecError> {
		<[u8; 8]>::decode(value).map(u64::from_be_bytes).map(Self)
	}
}
100

101
102
103
104
impl From<Duration> for BETimestamp {
	fn from(d: Duration) -> Self {
		BETimestamp(d.as_secs())
	}
105
106
}

107
108
109
impl Into<Duration> for BETimestamp {
	fn into(self) -> Duration {
		Duration::from_secs(self.0)
110
	}
111
}
112

113
114
115
116
117
118
119
/// [`BlockNumber`] wrapper with big-endian encoding.
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
struct BEBlockNumber(BlockNumber);

impl Encode for BEBlockNumber {
	fn size_hint(&self) -> usize {
		std::mem::size_of::<BlockNumber>()
120
121
	}

122
123
	fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
		f(&self.0.to_be_bytes())
124
125
126
	}
}

127
128
129
impl Decode for BEBlockNumber {
	fn decode<I: Input>(value: &mut I) -> Result<Self, CodecError> {
		<[u8; std::mem::size_of::<BlockNumber>()]>::decode(value).map(BlockNumber::from_be_bytes).map(Self)
130
	}
131
132
}

133
134
135
#[derive(Debug, Encode, Decode)]
enum State {
	/// Candidate data was first observed at the given time but is not available in any block.
136
	#[codec(index = 0)]
137
138
139
140
141
	Unavailable(BETimestamp),
	/// The candidate was first observed at the given time and was included in the given list of unfinalized blocks, which may be
	/// empty. The timestamp here is not used for pruning. Either one of these blocks will be finalized or the state will regress to
	/// `State::Unavailable`, in which case the same timestamp will be reused. Blocks are sorted ascending first by block number and
	/// then hash.
142
	#[codec(index = 1)]
143
144
	Unfinalized(BETimestamp, Vec<(BEBlockNumber, Hash)>),
	/// Candidate data has appeared in a finalized block and did so at the given time.
145
	#[codec(index = 2)]
146
	Finalized(BETimestamp)
147
148
}

149
150
151
152
153
154
// Meta information about a candidate.
#[derive(Debug, Encode, Decode)]
struct CandidateMeta {
	state: State,
	data_available: bool,
	chunks_stored: BitVec<BitOrderLsb0, u8>,
155
156
}

157
158
159
160
161
162
163
164
165
166
167
168
169
170
fn query_inner<D: Decode>(
	db: &Arc<dyn KeyValueDB>,
	column: u32,
	key: &[u8],
) -> Result<Option<D>, Error> {
	match db.get(column, key) {
		Ok(Some(raw)) => {
			let res = D::decode(&mut &raw[..])?;
			Ok(Some(res))
		}
		Ok(None) => Ok(None),
		Err(e) => {
			tracing::warn!(target: LOG_TARGET, err = ?e, "Error reading from the availability store");
			Err(e.into())
171
		}
172
	}
173
174
}

175
176
177
178
179
180
fn write_available_data(
	tx: &mut DBTransaction,
	hash: &CandidateHash,
	available_data: &AvailableData,
) {
	let key = (AVAILABLE_PREFIX, hash).encode();
181

182
183
	tx.put_vec(columns::DATA, &key[..], available_data.encode());
}
184

185
186
187
188
189
fn load_available_data(
	db: &Arc<dyn KeyValueDB>,
	hash: &CandidateHash,
) -> Result<Option<AvailableData>, Error> {
	let key = (AVAILABLE_PREFIX, hash).encode();
190

191
192
	query_inner(db, columns::DATA, &key)
}
193

194
195
196
197
198
fn delete_available_data(
	tx: &mut DBTransaction,
	hash: &CandidateHash,
) {
	let key = (AVAILABLE_PREFIX, hash).encode();
199

200
	tx.delete(columns::DATA, &key[..])
201
202
}

203
204
205
206
207
208
fn load_chunk(
	db: &Arc<dyn KeyValueDB>,
	candidate_hash: &CandidateHash,
	chunk_index: ValidatorIndex,
) -> Result<Option<ErasureChunk>, Error> {
	let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode();
209

210
211
	query_inner(db, columns::DATA, &key)
}
212

213
214
215
216
217
218
219
fn write_chunk(
	tx: &mut DBTransaction,
	candidate_hash: &CandidateHash,
	chunk_index: ValidatorIndex,
	erasure_chunk: &ErasureChunk,
) {
	let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode();
220

221
	tx.put_vec(columns::DATA, &key, erasure_chunk.encode());
222
223
}

224
225
226
227
228
229
fn delete_chunk(
	tx: &mut DBTransaction,
	candidate_hash: &CandidateHash,
	chunk_index: ValidatorIndex,
) {
	let key = (CHUNK_PREFIX, candidate_hash, chunk_index).encode();
230

231
	tx.delete(columns::DATA, &key[..]);
232
233
}

234
235
236
237
238
fn load_meta(
	db: &Arc<dyn KeyValueDB>,
	hash: &CandidateHash,
) -> Result<Option<CandidateMeta>, Error> {
	let key = (META_PREFIX, hash).encode();
239

240
	query_inner(db, columns::META, &key)
241
242
}

243
244
245
246
247
248
fn write_meta(
	tx: &mut DBTransaction,
	hash: &CandidateHash,
	meta: &CandidateMeta,
) {
	let key = (META_PREFIX, hash).encode();
249

250
	tx.put_vec(columns::META, &key, meta.encode());
251
252
}

253
254
255
fn delete_meta(tx: &mut DBTransaction, hash: &CandidateHash) {
	let key = (META_PREFIX, hash).encode();
	tx.delete(columns::META, &key[..])
256
257
}

258
259
fn delete_unfinalized_height(
	tx: &mut DBTransaction,
260
	block_number: BlockNumber,
261
262
263
) {
	let prefix = (UNFINALIZED_PREFIX, BEBlockNumber(block_number)).encode();
	tx.delete_prefix(columns::META, &prefix);
264
265
}

266
267
268
269
270
271
272
273
274
275
276
277
278
279
fn delete_unfinalized_inclusion(
	tx: &mut DBTransaction,
	block_number: BlockNumber,
	block_hash: &Hash,
	candidate_hash: &CandidateHash,
) {
	let key = (
		UNFINALIZED_PREFIX,
		BEBlockNumber(block_number),
		block_hash,
		candidate_hash,
	).encode();

	tx.delete(columns::META, &key[..]);
280
281
}

282
283
284
285
fn delete_pruning_key(tx: &mut DBTransaction, t: impl Into<BETimestamp>, h: &CandidateHash) {
	let key = (PRUNE_BY_TIME_PREFIX, t.into(), h).encode();
	tx.delete(columns::META, &key);
}
286

287
288
289
290
fn write_pruning_key(tx: &mut DBTransaction, t: impl Into<BETimestamp>, h: &CandidateHash) {
	let t = t.into();
	let key = (PRUNE_BY_TIME_PREFIX, t, h).encode();
	tx.put(columns::META, &key, TOMBSTONE_VALUE);
291
292
}

293
294
295
296
297
298
fn finalized_block_range(finalized: BlockNumber) -> (Vec<u8>, Vec<u8>) {
	// We use big-endian encoding to iterate in ascending order.
	let start = UNFINALIZED_PREFIX.encode();
	let end = (UNFINALIZED_PREFIX, BEBlockNumber(finalized + 1)).encode();

	(start, end)
299
300
}

301
302
303
304
305
306
307
308
fn write_unfinalized_block_contains(
	tx: &mut DBTransaction,
	n: BlockNumber,
	h: &Hash,
	ch: &CandidateHash,
) {
	let key = (UNFINALIZED_PREFIX, BEBlockNumber(n), h, ch).encode();
	tx.put(columns::META, &key, TOMBSTONE_VALUE);
309
310
}

311
312
313
fn pruning_range(now: impl Into<BETimestamp>) -> (Vec<u8>, Vec<u8>) {
	let start = PRUNE_BY_TIME_PREFIX.encode();
	let end = (PRUNE_BY_TIME_PREFIX, BETimestamp(now.into().0 + 1)).encode();
314

315
316
	(start, end)
}
317

318
319
320
fn decode_unfinalized_key(s: &[u8]) -> Result<(BlockNumber, Hash, CandidateHash), CodecError> {
	if !s.starts_with(UNFINALIZED_PREFIX) {
		return Err("missing magic string".into());
321
322
	}

323
324
325
	<(BEBlockNumber, Hash, CandidateHash)>::decode(&mut &s[UNFINALIZED_PREFIX.len()..])
		.map(|(b, h, ch)| (b.0, h, ch))
}
326

327
328
329
330
fn decode_pruning_key(s: &[u8]) -> Result<(Duration, CandidateHash), CodecError> {
	if !s.starts_with(PRUNE_BY_TIME_PREFIX) {
		return Err("missing magic string".into());
	}
331

332
333
334
	<(BETimestamp, CandidateHash)>::decode(&mut &s[PRUNE_BY_TIME_PREFIX.len()..])
		.map(|(t, ch)| (t.into(), ch))
}
335

336
337
338
339
340
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
	#[error(transparent)]
	RuntimeApi(#[from] RuntimeApiError),
341

342
343
344
345
346
	#[error(transparent)]
	ChainApi(#[from] ChainApiError),

	#[error(transparent)]
	Erasure(#[from] erasure::Error),
347

348
349
	#[error(transparent)]
	Io(#[from] io::Error),
350

351
352
	#[error(transparent)]
	Oneshot(#[from] oneshot::Canceled),
353

354
355
	#[error(transparent)]
	Subsystem(#[from] SubsystemError),
356

357
358
	#[error(transparent)]
	Time(#[from] SystemTimeError),
359

360
361
	#[error(transparent)]
	Codec(#[from] CodecError),
362

363
364
	#[error("Custom databases are not supported")]
	CustomDatabase,
365
366
}

367
368
369
370
371
372
373
374
375
376
impl Error {
	fn trace(&self) {
		match self {
			// don't spam the log with spurious errors
			Self::RuntimeApi(_) |
			Self::Oneshot(_) => tracing::debug!(target: LOG_TARGET, err = ?self),
			// it's worth reporting otherwise
			_ => tracing::warn!(target: LOG_TARGET, err = ?self),
		}
	}
377
378
}

379
380
381
382
383
384
385
386
387
388
389
390
391
/// Struct holding pruning timing configuration.
/// The only purpose of this structure is to use different timing
/// configurations in production and in testing.
#[derive(Clone)]
struct PruningConfig {
	/// How long unavailable data should be kept.
	keep_unavailable_for: Duration,

	/// How long finalized data should be kept.
	keep_finalized_for: Duration,

	/// How often to perform data pruning.
	pruning_interval: Duration,
392
393
}

394
395
396
397
398
399
400
401
impl Default for PruningConfig {
	fn default() -> Self {
		Self {
			keep_unavailable_for: KEEP_UNAVAILABLE_FOR,
			keep_finalized_for: KEEP_FINALIZED_FOR,
			pruning_interval: PRUNING_INTERVAL,
		}
	}
402
403
404
405
406
407
408
409
410
411
}

/// Configuration for the availability store.
pub struct Config {
	/// Total cache size in megabytes. If `None` the default (128 MiB per column) is used.
	pub cache_size: Option<usize>,
	/// Path to the database.
	pub path: PathBuf,
}

412
impl std::convert::TryFrom<sc_service::config::DatabaseConfig> for Config {
413
	type Error = Error;
414
415

	fn try_from(config: sc_service::config::DatabaseConfig) -> Result<Self, Self::Error> {
416
		let path = config.path().ok_or(Error::CustomDatabase)?;
417
418
419
420
421
422
423
424
425
426
427
428

		Ok(Self {
			// substrate cache size is improper here; just use the default
			cache_size: None,
			// DB path is a sub-directory of substrate db path to give two properties:
			// 1: column numbers don't conflict with substrate
			// 2: commands like purge-chain work without further changes
			path: path.join("parachains").join("av-store"),
		})
	}
}

429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
trait Clock: Send + Sync {
	// Returns time since unix epoch.
	fn now(&self) -> Result<Duration, Error>;
}

struct SystemClock;

impl Clock for SystemClock {
	fn now(&self) -> Result<Duration, Error> {
		SystemTime::now().duration_since(UNIX_EPOCH).map_err(Into::into)
	}
}

/// An implementation of the Availability Store subsystem.
pub struct AvailabilityStoreSubsystem {
	pruning_config: PruningConfig,
	db: Arc<dyn KeyValueDB>,
	metrics: Metrics,
	clock: Box<dyn Clock>,
}

450
451
impl AvailabilityStoreSubsystem {
	/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
452
	pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result<Self> {
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
		let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS);

		if let Some(cache_size) = config.cache_size {
			let mut memory_budget = HashMap::new();

			for i in 0..columns::NUM_COLUMNS {
				memory_budget.insert(i, cache_size / columns::NUM_COLUMNS as usize);
			}
			db_config.memory_budget = memory_budget;
		}

		let path = config.path.to_str().ok_or_else(|| io::Error::new(
			io::ErrorKind::Other,
			format!("Bad database path: {:?}", config.path),
		))?;

469
		std::fs::create_dir_all(&path)?;
470
471
472
		let db = Database::open(&db_config, &path)?;

		Ok(Self {
473
			pruning_config: PruningConfig::default(),
474
			db: Arc::new(db),
475
			metrics,
476
			clock: Box::new(SystemClock),
477
478
479
480
		})
	}

	#[cfg(test)]
481
482
483
484
485
	fn new_in_memory(
		db: Arc<dyn KeyValueDB>,
		pruning_config: PruningConfig,
		clock: Box<dyn Clock>,
	) -> Self {
486
		Self {
487
			pruning_config,
488
			db,
489
			metrics: Metrics(None),
490
			clock,
491
492
493
494
		}
	}
}

495
496
497
498
499
500
501
502
impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
where
	Context: SubsystemContext<Message = AvailabilityStoreMessage>,
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
		let future = run(self, ctx)
			.map(|_| Ok(()))
			.boxed();
503

504
505
506
507
508
		SpawnedSubsystem {
			name: "availability-store-subsystem",
			future,
		}
	}
509
510
}

511
#[tracing::instrument(skip(subsystem, ctx), fields(subsystem = LOG_TARGET))]
512
async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context)
513
514
515
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
516
517
	let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();

518
	loop {
519
		let res = run_iteration(&mut ctx, &mut subsystem, &mut next_pruning).await;
520
521
		match res {
			Err(e) => {
522
				e.trace();
523
524
525
526

				if let Error::Subsystem(SubsystemError::Context(_)) = e {
					break;
				}
527
528
			}
			Ok(true) => {
529
				tracing::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
530
531
532
533
534
535
536
				break;
			},
			Ok(false) => continue,
		}
	}
}

537
#[tracing::instrument(level = "trace", skip(subsystem, ctx), fields(subsystem = LOG_TARGET))]
538
539
540
541
542
async fn run_iteration<Context>(
	ctx: &mut Context,
	subsystem: &mut AvailabilityStoreSubsystem,
	mut next_pruning: &mut future::Fuse<Delay>,
)
543
544
545
546
547
548
549
550
551
552
553
	-> Result<bool, Error>
where
	Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
	select! {
		incoming = ctx.recv().fuse() => {
			match incoming? {
				FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(true),
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(
					ActiveLeavesUpdate { activated, .. })
				) => {
554
					for (activated, _span) in activated.into_iter() {
555
						let _timer = subsystem.metrics.time_block_activated();
556
						process_block_activated(ctx, subsystem, activated).await?;
557
					}
558
				}
559
560
561
562
563
564
565
566
567
				FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
					let _timer = subsystem.metrics.time_process_block_finalized();

					process_block_finalized(
						ctx,
						&subsystem,
						hash,
						number,
					).await?;
568
569
				}
				FromOverseer::Communication { msg } => {
570
571
					let _timer = subsystem.metrics.time_process_message();
					process_message(subsystem, msg)?;
572
573
574
				}
			}
		}
575
576
577
578
579
580
581
		_ = next_pruning => {
			// It's important to set the delay before calling `prune_all` because an error in `prune_all`
			// could lead to the delay not being set again. Then we would never prune anything anymore.
			*next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();

			let _timer = subsystem.metrics.time_pruning();
			prune_all(&subsystem.db, &*subsystem.clock)?;
582
		}
583
584
	}

585
	Ok(false)
586
587
}

588
589
590
591
async fn process_block_activated(
	ctx: &mut impl SubsystemContext,
	subsystem: &mut AvailabilityStoreSubsystem,
	activated: Hash,
592
) -> Result<(), Error> {
593
	let now = subsystem.clock.now()?;
594

595
596
597
598
599
	let candidate_events = {
		let (tx, rx) = oneshot::channel();
		ctx.send_message(
			RuntimeApiMessage::Request(activated, RuntimeApiRequest::CandidateEvents(tx)).into()
		).await;
600

601
602
		rx.await??
	};
603

604
605
606
607
608
	let block_number = {
		let (tx, rx) = oneshot::channel();
		ctx.send_message(
			ChainApiMessage::BlockNumber(activated, tx).into()
		).await;
609

610
611
612
		match rx.await?? {
			None => return Ok(()),
			Some(n) => n,
613
		}
614
	};
615

616
617
	let block_header = {
		let (tx, rx) = oneshot::channel();
618

619
620
621
		ctx.send_message(
			ChainApiMessage::BlockHeader(activated, tx).into()
		).await;
622

623
624
625
		match rx.await?? {
			None => return Ok(()),
			Some(n) => n,
626
627
		}
	};
628

629
630
631
632
633
634
635
	// We need to request the number of validators based on the parent state, as that is the number of validators
	// used to create this block.
	let n_validators = {
		let (tx, rx) = oneshot::channel();
		ctx.send_message(
			RuntimeApiMessage::Request(block_header.parent_hash, RuntimeApiRequest::Validators(tx)).into()
		).await;
636

637
638
		rx.await??.len()
	};
639

640
	let mut tx = DBTransaction::new();
641

642
643
	for event in candidate_events {
		match event {
644
			CandidateEvent::CandidateBacked(receipt, _head, _core_index, _group_index) => {
645
646
647
648
649
650
651
652
				note_block_backed(
					&subsystem.db,
					&mut tx,
					&subsystem.pruning_config,
					now,
					n_validators,
					receipt,
				)?;
653
			}
654
			CandidateEvent::CandidateIncluded(receipt, _head, _core_index, _group_index) => {
655
656
657
658
659
660
661
662
663
				note_block_included(
					&subsystem.db,
					&mut tx,
					&subsystem.pruning_config,
					(block_number, activated),
					receipt,
				)?;
			}
			_ => {}
664
		}
665
	}
666

667
	subsystem.db.write(tx)?;
668

669
670
	Ok(())
}
671

672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
fn note_block_backed(
	db: &Arc<dyn KeyValueDB>,
	db_transaction: &mut DBTransaction,
	pruning_config: &PruningConfig,
	now: Duration,
	n_validators: usize,
	candidate: CandidateReceipt,
) -> Result<(), Error> {
	let candidate_hash = candidate.hash();

	if load_meta(db, &candidate_hash)?.is_none() {
		let meta = CandidateMeta {
			state: State::Unavailable(now.into()),
			data_available: false,
			chunks_stored: bitvec::bitvec![BitOrderLsb0, u8; 0; n_validators],
		};
688

689
		let prune_at = now + pruning_config.keep_unavailable_for;
690

691
692
		write_pruning_key(db_transaction, prune_at, &candidate_hash);
		write_meta(db_transaction, &candidate_hash, &meta);
693
694
695
696
697
	}

	Ok(())
}

698
699
700
701
702
703
704
705
fn note_block_included(
	db: &Arc<dyn KeyValueDB>,
	db_transaction: &mut DBTransaction,
	pruning_config:&PruningConfig,
	block: (BlockNumber, Hash),
	candidate: CandidateReceipt,
) -> Result<(), Error> {
	let candidate_hash = candidate.hash();
706

707
708
709
710
711
712
713
714
715
716
717
718
	match load_meta(db, &candidate_hash)? {
		None => {
			// This is alarming. We've observed a block being included without ever seeing it backed.
			// Warn and ignore.
			tracing::warn!(
				target: LOG_TARGET,
				"Candidate {}, included without being backed?",
				candidate_hash,
			);
		}
		Some(mut meta) => {
			let be_block = (BEBlockNumber(block.0), block.1);
719

720
721
722
723
724
			meta.state = match meta.state {
				State::Unavailable(at) => {
					let at_d: Duration = at.into();
					let prune_at = at_d + pruning_config.keep_unavailable_for;
					delete_pruning_key(db_transaction, prune_at, &candidate_hash);
725

726
727
728
729
730
731
					State::Unfinalized(at, vec![be_block])
				}
				State::Unfinalized(at, mut within) => {
					if let Err(i) = within.binary_search(&be_block) {
						within.insert(i, be_block);
					}
732

733
734
735
736
737
738
739
740
					State::Unfinalized(at, within)
				}
				State::Finalized(_at) => {
					// This should never happen as a candidate would have to be included after
					// finality.
					return Ok(())
				}
			};
741

742
743
			write_unfinalized_block_contains(db_transaction, block.0, &block.1, &candidate_hash);
			write_meta(db_transaction, &candidate_hash, &meta);
744
		}
745
	}
746

747
748
	Ok(())
}
749

750
751
752
753
754
macro_rules! peek_num {
	($iter:ident) => {
		match $iter.peek() {
			Some((k, _)) => decode_unfinalized_key(&k[..]).ok().map(|(b, _, _)| b),
			None => None
755
		}
756
757
	}
}
758

759
760
761
762
763
764
765
async fn process_block_finalized(
	ctx: &mut impl SubsystemContext,
	subsystem: &AvailabilityStoreSubsystem,
	finalized_hash: Hash,
	finalized_number: BlockNumber,
) -> Result<(), Error> {
	let now = subsystem.clock.now()?;
766

767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
	let mut next_possible_batch = 0;
	loop {
		let mut db_transaction = DBTransaction::new();
		let (start_prefix, end_prefix) = finalized_block_range(finalized_number);

		// We have to do some juggling here of the `iter` to make sure it doesn't cross the `.await` boundary
		// as it is not `Send`. That is why we create the iterator once within this loop, drop it,
		// do an asynchronous request, and then instantiate the exact same iterator again.
		let batch_num = {
			let mut iter = subsystem.db.iter_with_prefix(columns::META, &start_prefix)
				.take_while(|(k, _)| &k[..] < &end_prefix[..])
				.peekable();

			match peek_num!(iter) {
				None => break, // end of iterator.
				Some(n) => n,
			}
		};
785

786
787
		if batch_num < next_possible_batch { continue } // sanity.
		next_possible_batch = batch_num + 1;
788

789
790
791
792
793
		let batch_finalized_hash = if batch_num == finalized_number {
			finalized_hash
		} else {
			let (tx, rx) = oneshot::channel();
			ctx.send_message(ChainApiMessage::FinalizedBlockHash(batch_num, tx).into()).await;
794

795
796
797
798
799
800
801
			match rx.await?? {
				None => {
					tracing::warn!(target: LOG_TARGET,
						"Availability store was informed that block #{} is finalized, \
						but chain API has no finalized hash.",
						batch_num,
					);
802

803
					break
804
				}
805
				Some(h) => h,
806
			}
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
		};

		let iter = subsystem.db.iter_with_prefix(columns::META, &start_prefix)
			.take_while(|(k, _)| &k[..] < &end_prefix[..])
			.peekable();

		let batch = load_all_at_finalized_height(iter, batch_num, batch_finalized_hash);

		// Now that we've iterated over the entire batch at this finalized height,
		// update the meta.

		delete_unfinalized_height(&mut db_transaction, batch_num);

		update_blocks_at_finalized_height(
			&subsystem,
			&mut db_transaction,
			batch,
			batch_num,
			now,
		)?;

		// We need to write at the end of the loop so the prefix iterator doesn't pick up the same values again
		// in the next iteration. Another unfortunate effect of having to re-initialize the iterator.
		subsystem.db.write(db_transaction)?;
831
832
833
834
835
	}

	Ok(())
}

836
837
838
839
840
841
842
843
844
// loads all candidates at the finalized height and maps them to `true` if finalized
// and `false` if unfinalized.
fn load_all_at_finalized_height(
	mut iter: std::iter::Peekable<impl Iterator<Item = (Box<[u8]>, Box<[u8]>)>>,
	block_number: BlockNumber,
	finalized_hash: Hash,
) -> impl IntoIterator<Item = (CandidateHash, bool)> {
	// maps candidate hashes to true if finalized, false otherwise.
	let mut candidates = HashMap::new();
845

846
847
848
849
850
851
852
	// Load all candidates that were included at this height.
	loop {
		match peek_num!(iter) {
			None => break, // end of iterator.
			Some(n) if n != block_number => break, // end of batch.
			_ => {}
		}
853

854
855
856
		let (k, _v) = iter.next().expect("`peek` used to check non-empty; qed");
		let (_, block_hash, candidate_hash) = decode_unfinalized_key(&k[..])
			.expect("`peek_num` checks validity of key; qed");
857

858
859
860
861
		if block_hash == finalized_hash {
			candidates.insert(candidate_hash, true);
		} else {
			candidates.entry(candidate_hash).or_insert(false);
862
863
864
		}
	}

865
	candidates
866
867
}

868
869
870
871
872
873
fn update_blocks_at_finalized_height(
	subsystem: &AvailabilityStoreSubsystem,
	db_transaction: &mut DBTransaction,
	candidates: impl IntoIterator<Item = (CandidateHash, bool)>,
	block_number: BlockNumber,
	now: Duration,
874
) -> Result<(), Error> {
875
876
877
878
879
880
881
882
	for (candidate_hash, is_finalized) in candidates {
		let mut meta = match load_meta(&subsystem.db, &candidate_hash)? {
			None => {
				tracing::warn!(
					target: LOG_TARGET,
					"Dangling candidate metadata for {}",
					candidate_hash,
				);
883

884
885
886
887
				continue;
			}
			Some(c) => c,
		};
888

889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
		if is_finalized {
			// Clear everything else related to this block. We're finalized now!
			match meta.state {
				State::Finalized(_) => continue, // sanity
				State::Unavailable(at) => {
					// This is also not going to happen; the very fact that we are
					// iterating over the candidate here indicates that `State` should
					// be `Unfinalized`.
					delete_pruning_key(db_transaction, at, &candidate_hash);
				}
				State::Unfinalized(_, blocks) => {
					for (block_num, block_hash) in blocks.iter().cloned() {
						// this exact height is all getting cleared out anyway.
						if block_num.0 != block_number {
							delete_unfinalized_inclusion(
								db_transaction,
								block_num.0,
								&block_hash,
								&candidate_hash,
							);
						}
					}
				}
			}
913

914
			meta.state = State::Finalized(now.into());
915

916
917
918
919
920
921
			// Write the meta and a pruning record.
			write_meta(db_transaction, &candidate_hash, &meta);
			write_pruning_key(
				db_transaction,
				now + subsystem.pruning_config.keep_finalized_for,
				&candidate_hash,
922
			);
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
		} else {
			meta.state = match meta.state {
				State::Finalized(_) => continue, // sanity.
				State::Unavailable(_) => continue, // sanity.
				State::Unfinalized(at, mut blocks) => {
					// Clear out everything at this height.
					blocks.retain(|(n, _)| n.0 != block_number);

					// If empty, we need to go back to being unavailable as we aren't
					// aware of any blocks this is included in.
					if blocks.is_empty() {
						let at_d: Duration = at.into();
						let prune_at = at_d + subsystem.pruning_config.keep_unavailable_for;
						write_pruning_key(db_transaction, prune_at, &candidate_hash);
						State::Unavailable(at)
					} else {
						State::Unfinalized(at, blocks)
					}
				}
			};

			// Update the meta entry.
			write_meta(db_transaction, &candidate_hash, &meta)
946
947
948
949
950
951
		}
	}

	Ok(())
}

952
fn process_message(
953
	subsystem: &mut AvailabilityStoreSubsystem,
954
	msg: AvailabilityStoreMessage,
955
) -> Result<(), Error> {
956
957
958
959
960
961
962
963
964
965
966
967
968
969
	match msg {
		AvailabilityStoreMessage::QueryAvailableData(candidate, tx) => {
			let _ = tx.send(load_available_data(&subsystem.db, &candidate)?);
		}
		AvailabilityStoreMessage::QueryDataAvailability(candidate, tx) => {
			let a = load_meta(&subsystem.db, &candidate)?.map_or(false, |m| m.data_available);
			let _ = tx.send(a);
		}
		AvailabilityStoreMessage::QueryChunk(candidate, validator_index, tx) => {
			let _timer = subsystem.metrics.time_get_chunk();
			let _ = tx.send(load_chunk(&subsystem.db, &candidate, validator_index)?);
		}
		AvailabilityStoreMessage::QueryChunkAvailability(candidate, validator_index, tx) => {
			let a = load_meta(&subsystem.db, &candidate)?
970
971
972
				.map_or(false, |m|
					*m.chunks_stored.get(validator_index as usize).as_deref().unwrap_or(&false)
				);
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
			let _ = tx.send(a);
		}
		AvailabilityStoreMessage::StoreChunk {
			candidate_hash,
			relay_parent: _,
			chunk,
			tx,
		} => {
			subsystem.metrics.on_chunks_received(1);
			let _timer = subsystem.metrics.time_store_chunk();

			match store_chunk(&subsystem.db, candidate_hash, chunk) {
				Ok(true) => {
					let _ = tx.send(Ok(()));
				}
				Ok(false) => {
					let _ = tx.send(Err(()));
				}
				Err(e) => {
					let _ = tx.send(Err(()));
					return Err(e)
				}
			}
		}
		AvailabilityStoreMessage::StoreAvailableData(candidate, _our_index, n_validators, available_data, tx) => {
			subsystem.metrics.on_chunks_received(n_validators as _);
999

1000
			let _timer = subsystem.metrics.time_store_available_data();
For faster browsing, not all history is shown. View entire blame