lib.rs 10.1 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2018-2020 Parity Technologies (UK) Ltd.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Persistent database for parachain data: PoV block data, erasure-coding chunks and outgoing messages.
18
19
20
21
//!
//! This will be written into during the block validation pipeline, and queried
//! by networking code in order to circulate required data and maintain availability
//! of it.
22

23
24
25
#![warn(missing_docs)]

use futures::prelude::*;
26
use futures::{channel::{mpsc, oneshot}, task::Spawn};
27
28
29
30
use keystore::KeyStorePtr;
use polkadot_primitives::{
	Hash, Block,
	parachain::{
31
32
		PoVBlock, AbridgedCandidateReceipt, ErasureChunk,
		ParachainHost, AvailableData, OmittedValidationData,
33
34
	},
};
35
use sp_runtime::traits::HashFor;
36
37
use sp_blockchain::{Result as ClientResult};
use client::{
Gavin Wood's avatar
Gavin Wood committed
38
	BlockchainEvents, BlockBackend,
39
};
40
use sp_api::{ApiExt, ProvideRuntimeApi};
41
use codec::{Encode, Decode};
42

43
use log::warn;
44

45
use std::sync::Arc;
46
47
48
use std::collections::HashSet;
use std::path::PathBuf;
use std::io;
49
use std::pin::Pin;
50

51
52
53
54
mod worker;
mod store;

pub use worker::AvailabilityBlockImport;
55
pub use store::AwaitedFrontierEntry;
56
57

use worker::{
58
	Worker, WorkerHandle, IncludedParachainBlocks, WorkerMsg, MakeAvailable, Chunks
59
60
61
62
63
};

use store::{Store as InnerStore};

const LOG_TARGET: &str = "availability";
64
65
66
67
68
69
70
71
72

/// Configuration for the availability store.
pub struct Config {
	/// Cache size in bytes. If `None` default is used.
	pub cache_size: Option<usize>,
	/// Path to the database.
	pub path: PathBuf,
}

73
/// An abstraction around networking for the availablity-store.
74
75
///
/// Currently it is not possible to use the networking code in the availability store
joe petrowski's avatar
joe petrowski committed
76
/// core directly due to a number of loop dependencies it requires:
77
78
79
80
81
///
/// `availability-store` -> `network` -> `availability-store`
///
/// `availability-store` -> `network` -> `validation` -> `availability-store`
///
82
83
/// So we provide this trait that gets implemented for a type in
/// the [`network`] module or a mock in tests.
84
85
///
/// [`network`]: ../polkadot_network/index.html
86
87
88
89
90
91
pub trait ErasureNetworking {
	/// Errors that can occur when fetching erasure chunks.
	type Error: std::fmt::Debug + 'static;

	/// Fetch an erasure chunk from the networking service.
	fn fetch_erasure_chunk(
92
		&self,
93
94
95
		candidate_hash: &Hash,
		index: u32,
	) -> Pin<Box<dyn Future<Output = Result<ErasureChunk, Self::Error>> + Send>>;
96

97
98
	/// Distributes an erasure chunk to the correct validator node.
	fn distribute_erasure_chunk(
99
100
101
102
103
104
		&self,
		candidate_hash: Hash,
		chunk: ErasureChunk,
	);
}

joe petrowski's avatar
joe petrowski committed
105
/// Data that, when combined with an `AbridgedCandidateReceipt`, is enough
106
107
108
109
110
111
112
/// to fully re-execute a block.
#[derive(Debug, Encode, Decode, PartialEq)]
pub struct ExecutionData {
	/// The `PoVBlock`.
	pub pov_block: PoVBlock,
	/// The data omitted from the `AbridgedCandidateReceipt`.
	pub omitted_validation: OmittedValidationData,
113
114
115
}

/// Handle to the availability store.
116
117
118
119
120
///
/// This provides a proxying API that
///   * in case of write operations provides async methods that send data to
///     the background worker and resolve when that data is processed by the worker
///   * in case of read opeartions queries the underlying storage synchronously.
121
122
#[derive(Clone)]
pub struct Store {
123
124
125
	inner: InnerStore,
	worker: Arc<WorkerHandle>,
	to_worker: mpsc::UnboundedSender<WorkerMsg>,
126
127
128
}

impl Store {
joe petrowski's avatar
joe petrowski committed
129
	/// Create a new `Store` with given config on disk.
130
	///
joe petrowski's avatar
joe petrowski committed
131
	/// Creating a store among other things starts a background worker thread that
132
	/// handles most of the write operations to the storage.
133
	#[cfg(not(target_os = "unknown"))]
134
135
	pub fn new<EN>(config: Config, network: EN) -> io::Result<Self>
		where EN: ErasureNetworking + Send + Sync + Clone + 'static
136
137
	{
		let inner = InnerStore::new(config)?;
138
		let worker = Arc::new(Worker::start(inner.clone(), network));
139
140
141
142
143
144
		let to_worker = worker.to_worker().clone();

		Ok(Self {
			inner,
			worker,
			to_worker,
145
146
147
		})
	}

joe petrowski's avatar
joe petrowski committed
148
	/// Create a new in-memory `Store`. Useful for tests.
149
150
	///
	/// Creating a store among other things starts a background worker thread
joe petrowski's avatar
joe petrowski committed
151
	/// that handles most of the write operations to the storage.
152
153
	pub fn new_in_memory<EN>(network: EN) -> Self
		where EN: ErasureNetworking + Send + Sync + Clone + 'static
154
155
	{
		let inner = InnerStore::new_in_memory();
156
		let worker = Arc::new(Worker::start(inner.clone(), network));
157
158
159
160
161
162
		let to_worker = worker.to_worker().clone();

		Self {
			inner,
			worker,
			to_worker,
163
164
165
		}
	}

166
167
168
169
170
171
172
173
174
175
176
	/// Obtain a [`BlockImport`] implementation to import blocks into this store.
	///
	/// This block import will act upon all newly imported blocks sending information
	/// about parachain heads included in them to this `Store`'s background worker.
	/// The user may create multiple instances of [`BlockImport`]s with this call.
	///
	/// [`BlockImport`]: https://substrate.dev/rustdocs/v1.0/substrate_consensus_common/trait.BlockImport.html
	pub fn block_import<I, P>(
		&self,
		wrapped_block_import: I,
		client: Arc<P>,
177
		spawner: impl Spawn,
178
		keystore: KeyStorePtr,
179
	) -> ClientResult<AvailabilityBlockImport<I, P>>
180
	where
Gavin Wood's avatar
Gavin Wood committed
181
		P: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + BlockBackend<Block> + Send + Sync + 'static,
182
183
		P::Api: ParachainHost<Block>,
		P::Api: ApiExt<Block, Error=sp_blockchain::Error>,
184
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
Gavin Wood's avatar
Gavin Wood committed
185
		sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
186
187
188
189
190
191
	{
		let to_worker = self.to_worker.clone();

		let import = AvailabilityBlockImport::new(
			client,
			wrapped_block_import,
192
			spawner,
193
194
195
196
197
198
199
			keystore,
			to_worker,
		);

		Ok(import)
	}

200
	/// Make some data available provisionally.
201
202
203
204
205
206
	///
	/// Validators with the responsibility of maintaining availability
	/// for a block or collators collating a block will call this function
	/// in order to persist that data to disk and so it can be queried and provided
	/// to other nodes in the network.
	///
207
	/// Determination of invalidity is beyond the scope of this function.
208
	///
209
210
211
212
213
	/// This method will send the data to the background worker, allowing the caller to
	/// asynchronously wait for the result.
	pub async fn make_available(&self, candidate_hash: Hash, available_data: AvailableData)
		-> io::Result<()>
	{
214
215
		let (s, r) = oneshot::channel();
		let msg = WorkerMsg::MakeAvailable(MakeAvailable {
216
217
			candidate_hash,
			available_data,
218
219
220
221
222
223
224
225
226
			result: s,
		});

		let _ = self.to_worker.unbounded_send(msg);

		if let Ok(Ok(())) = r.await {
			Ok(())
		} else {
			Err(io::Error::new(io::ErrorKind::Other, format!("adding erasure chunks failed")))
227
228
229
230
		}

	}

231
232
	/// Get a set of all chunks we are waiting for.
	pub fn awaited_chunks(&self) -> Option<HashSet<AwaitedFrontierEntry>> {
233
234
		self.inner.awaited_chunks()
	}
235

236
237
238
239
240
	/// Adds an erasure chunk to storage.
	///
	/// The chunk should be checked for validity against the root of encoding
	/// and its proof prior to calling this.
	///
joe petrowski's avatar
joe petrowski committed
241
242
	/// This method will send the chunk to the background worker, allowing the caller to
	/// asynchronously wait for the result.
243
244
	pub async fn add_erasure_chunk(
		&self,
245
		candidate: AbridgedCandidateReceipt,
246
		n_validators: u32,
247
248
		chunk: ErasureChunk,
	) -> io::Result<()> {
249
		self.add_erasure_chunks(candidate, n_validators, std::iter::once(chunk)).await
250
	}
251

252
253
254
	/// Adds a set of erasure chunks to storage.
	///
	/// The chunks should be checked for validity against the root of encoding
255
	/// and its proof prior to calling this.
256
	///
joe petrowski's avatar
joe petrowski committed
257
258
	/// This method will send the chunks to the background worker, allowing the caller to
	/// asynchronously wait for the result.
259
260
	pub async fn add_erasure_chunks<I>(
		&self,
261
		candidate: AbridgedCandidateReceipt,
262
		n_validators: u32,
263
264
265
266
		chunks: I,
	) -> io::Result<()>
		where I: IntoIterator<Item = ErasureChunk>
	{
267
268
269
		let candidate_hash = candidate.hash();

		self.add_candidate(candidate).await?;
270

271
272
		let (s, r) = oneshot::channel();
		let chunks = chunks.into_iter().collect();
273

274
275
276
		let msg = WorkerMsg::Chunks(Chunks {
			candidate_hash,
			chunks,
277
			n_validators,
278
279
			result: s,
		});
280

281
		let _ = self.to_worker.unbounded_send(msg);
282

283
284
285
286
287
288
		if let Ok(Ok(())) = r.await {
			Ok(())
		} else {
			Err(io::Error::new(io::ErrorKind::Other, format!("adding erasure chunks failed")))
		}
	}
289

290
	/// Queries an erasure chunk by the candidate hash and validator index.
291
292
	pub fn get_erasure_chunk(
		&self,
293
294
		candidate_hash: &Hash,
		validator_index: usize,
295
	) -> Option<ErasureChunk> {
296
		self.inner.get_erasure_chunk(candidate_hash, validator_index)
297
	}
298

299
300
301
302
303
	/// Note a validator's index and a number of validators at a relay parent in the
	/// store.
	///
	/// This should be done before adding erasure chunks with this relay parent.
	pub fn note_validator_index_and_n_validators(
304
		&self,
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
		relay_parent: &Hash,
		validator_index: u32,
		n_validators: u32,
	) -> io::Result<()> {
		self.inner.note_validator_index_and_n_validators(
			relay_parent,
			validator_index,
			n_validators,
		)
	}

	// Stores a candidate receipt.
	async fn add_candidate(
		&self,
		candidate: AbridgedCandidateReceipt,
320
321
	) -> io::Result<()> {
		let (s, r) = oneshot::channel();
322

323
324
325
326
327
		let msg = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks {
			blocks: vec![crate::worker::IncludedParachainBlock {
				candidate,
				available_data: None,
			}],
328
329
			result: s,
		});
330

331
		let _ = self.to_worker.unbounded_send(msg);
332

333
334
335
336
337
		if let Ok(Ok(())) = r.await {
			Ok(())
		} else {
			Err(io::Error::new(io::ErrorKind::Other, format!("adding erasure chunks failed")))
		}
338
339
	}

340
341
342
343
	/// Queries a candidate receipt by its hash.
	pub fn get_candidate(&self, candidate_hash: &Hash)
		-> Option<AbridgedCandidateReceipt>
	{
344
345
		self.inner.get_candidate(candidate_hash)
	}
346

347
348
349
	/// Query execution data by pov-block hash.
	pub fn execution_data(&self, candidate_hash: &Hash)
		-> Option<ExecutionData>
350
	{
351
		self.inner.execution_data(candidate_hash)
352
	}
353
}