lib.rs 10.2 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2018-2020 Parity Technologies (UK) Ltd.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Persistent database for parachain data: PoV block data, erasure-coding chunks and outgoing messages.
18
19
20
21
//!
//! This will be written into during the block validation pipeline, and queried
//! by networking code in order to circulate required data and maintain availability
//! of it.
22

23
24
25
#![warn(missing_docs)]

use futures::prelude::*;
26
use futures::channel::{mpsc, oneshot};
27
use keystore::KeyStorePtr;
asynchronous rob's avatar
asynchronous rob committed
28
use polkadot_primitives::v0::{
29
	Hash, Block,
asynchronous rob's avatar
asynchronous rob committed
30
31
	PoVBlock, AbridgedCandidateReceipt, ErasureChunk,
	ParachainHost, AvailableData, OmittedValidationData,
32
};
33
use sp_runtime::traits::HashFor;
34
use sp_blockchain::Result as ClientResult;
35
use client::{
Gavin Wood's avatar
Gavin Wood committed
36
	BlockchainEvents, BlockBackend,
37
};
38
use sp_api::{ApiExt, ProvideRuntimeApi};
39
use codec::{Encode, Decode};
40
use sp_core::traits::SpawnNamed;
41

42
use log::warn;
43

44
use std::sync::Arc;
45
46
47
use std::collections::HashSet;
use std::path::PathBuf;
use std::io;
48
use std::pin::Pin;
49

50
51
52
53
mod worker;
mod store;

pub use worker::AvailabilityBlockImport;
54
pub use store::AwaitedFrontierEntry;
55
56

use worker::{
57
	Worker, WorkerHandle, IncludedParachainBlocks, WorkerMsg, MakeAvailable, Chunks
58
59
};

60
use store::Store as InnerStore;
61
62

const LOG_TARGET: &str = "availability";
63
64
65
66
67
68
69
70
71

/// Configuration for the availability store.
pub struct Config {
	/// Cache size in bytes. If `None` default is used.
	pub cache_size: Option<usize>,
	/// Path to the database.
	pub path: PathBuf,
}

72
/// An abstraction around networking for the availablity-store.
73
74
///
/// Currently it is not possible to use the networking code in the availability store
joe petrowski's avatar
joe petrowski committed
75
/// core directly due to a number of loop dependencies it requires:
76
77
78
79
80
///
/// `availability-store` -> `network` -> `availability-store`
///
/// `availability-store` -> `network` -> `validation` -> `availability-store`
///
81
82
/// So we provide this trait that gets implemented for a type in
/// the [`network`] module or a mock in tests.
83
84
///
/// [`network`]: ../polkadot_network/index.html
85
86
87
88
89
90
pub trait ErasureNetworking {
	/// Errors that can occur when fetching erasure chunks.
	type Error: std::fmt::Debug + 'static;

	/// Fetch an erasure chunk from the networking service.
	fn fetch_erasure_chunk(
91
		&self,
92
93
94
		candidate_hash: &Hash,
		index: u32,
	) -> Pin<Box<dyn Future<Output = Result<ErasureChunk, Self::Error>> + Send>>;
95

96
97
	/// Distributes an erasure chunk to the correct validator node.
	fn distribute_erasure_chunk(
98
99
100
101
102
103
		&self,
		candidate_hash: Hash,
		chunk: ErasureChunk,
	);
}

joe petrowski's avatar
joe petrowski committed
104
/// Data that, when combined with an `AbridgedCandidateReceipt`, is enough
105
106
107
108
109
110
111
/// to fully re-execute a block.
#[derive(Debug, Encode, Decode, PartialEq)]
pub struct ExecutionData {
	/// The `PoVBlock`.
	pub pov_block: PoVBlock,
	/// The data omitted from the `AbridgedCandidateReceipt`.
	pub omitted_validation: OmittedValidationData,
112
113
114
}

/// Handle to the availability store.
115
116
117
118
119
///
/// This provides a proxying API that
///   * in case of write operations provides async methods that send data to
///     the background worker and resolve when that data is processed by the worker
///   * in case of read opeartions queries the underlying storage synchronously.
120
121
#[derive(Clone)]
pub struct Store {
122
123
124
	inner: InnerStore,
	worker: Arc<WorkerHandle>,
	to_worker: mpsc::UnboundedSender<WorkerMsg>,
125
126
127
}

impl Store {
joe petrowski's avatar
joe petrowski committed
128
	/// Create a new `Store` with given config on disk.
129
	///
joe petrowski's avatar
joe petrowski committed
130
	/// Creating a store among other things starts a background worker thread that
131
	/// handles most of the write operations to the storage.
132
	#[cfg(not(target_os = "unknown"))]
133
134
	pub fn new<EN>(config: Config, network: EN) -> io::Result<Self>
		where EN: ErasureNetworking + Send + Sync + Clone + 'static
135
136
	{
		let inner = InnerStore::new(config)?;
137
		let worker = Arc::new(Worker::start(inner.clone(), network));
138
139
140
141
142
143
		let to_worker = worker.to_worker().clone();

		Ok(Self {
			inner,
			worker,
			to_worker,
144
145
146
		})
	}

joe petrowski's avatar
joe petrowski committed
147
	/// Create a new in-memory `Store`. Useful for tests.
148
149
	///
	/// Creating a store among other things starts a background worker thread
joe petrowski's avatar
joe petrowski committed
150
	/// that handles most of the write operations to the storage.
151
152
	pub fn new_in_memory<EN>(network: EN) -> Self
		where EN: ErasureNetworking + Send + Sync + Clone + 'static
153
154
	{
		let inner = InnerStore::new_in_memory();
155
		let worker = Arc::new(Worker::start(inner.clone(), network));
156
157
158
159
160
161
		let to_worker = worker.to_worker().clone();

		Self {
			inner,
			worker,
			to_worker,
162
163
164
		}
	}

165
166
167
168
169
170
171
172
173
174
175
	/// Obtain a [`BlockImport`] implementation to import blocks into this store.
	///
	/// This block import will act upon all newly imported blocks sending information
	/// about parachain heads included in them to this `Store`'s background worker.
	/// The user may create multiple instances of [`BlockImport`]s with this call.
	///
	/// [`BlockImport`]: https://substrate.dev/rustdocs/v1.0/substrate_consensus_common/trait.BlockImport.html
	pub fn block_import<I, P>(
		&self,
		wrapped_block_import: I,
		client: Arc<P>,
176
		spawner: impl SpawnNamed,
177
		keystore: KeyStorePtr,
178
	) -> ClientResult<AvailabilityBlockImport<I, P>>
179
	where
Gavin Wood's avatar
Gavin Wood committed
180
		P: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + BlockBackend<Block> + Send + Sync + 'static,
181
182
		P::Api: ParachainHost<Block>,
		P::Api: ApiExt<Block, Error=sp_blockchain::Error>,
183
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
Gavin Wood's avatar
Gavin Wood committed
184
		sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
185
186
187
188
189
190
	{
		let to_worker = self.to_worker.clone();

		let import = AvailabilityBlockImport::new(
			client,
			wrapped_block_import,
191
			spawner,
192
193
194
195
196
197
198
			keystore,
			to_worker,
		);

		Ok(import)
	}

199
	/// Make some data available provisionally.
200
201
202
203
204
205
	///
	/// Validators with the responsibility of maintaining availability
	/// for a block or collators collating a block will call this function
	/// in order to persist that data to disk and so it can be queried and provided
	/// to other nodes in the network.
	///
206
	/// Determination of invalidity is beyond the scope of this function.
207
	///
208
209
210
211
212
	/// This method will send the data to the background worker, allowing the caller to
	/// asynchronously wait for the result.
	pub async fn make_available(&self, candidate_hash: Hash, available_data: AvailableData)
		-> io::Result<()>
	{
213
214
		let (s, r) = oneshot::channel();
		let msg = WorkerMsg::MakeAvailable(MakeAvailable {
215
216
			candidate_hash,
			available_data,
217
218
219
220
221
222
223
224
225
			result: s,
		});

		let _ = self.to_worker.unbounded_send(msg);

		if let Ok(Ok(())) = r.await {
			Ok(())
		} else {
			Err(io::Error::new(io::ErrorKind::Other, format!("adding erasure chunks failed")))
226
227
228
229
		}

	}

230
231
	/// Get a set of all chunks we are waiting for.
	pub fn awaited_chunks(&self) -> Option<HashSet<AwaitedFrontierEntry>> {
232
233
		self.inner.awaited_chunks()
	}
234

235
236
237
238
239
	/// Adds an erasure chunk to storage.
	///
	/// The chunk should be checked for validity against the root of encoding
	/// and its proof prior to calling this.
	///
joe petrowski's avatar
joe petrowski committed
240
241
	/// This method will send the chunk to the background worker, allowing the caller to
	/// asynchronously wait for the result.
242
243
	pub async fn add_erasure_chunk(
		&self,
244
		candidate: AbridgedCandidateReceipt,
245
		n_validators: u32,
246
247
		chunk: ErasureChunk,
	) -> io::Result<()> {
248
		self.add_erasure_chunks(candidate, n_validators, std::iter::once(chunk)).await
249
	}
250

251
252
253
	/// Adds a set of erasure chunks to storage.
	///
	/// The chunks should be checked for validity against the root of encoding
254
	/// and its proof prior to calling this.
255
	///
joe petrowski's avatar
joe petrowski committed
256
257
	/// This method will send the chunks to the background worker, allowing the caller to
	/// asynchronously wait for the result.
258
259
	pub async fn add_erasure_chunks<I>(
		&self,
260
		candidate: AbridgedCandidateReceipt,
261
		n_validators: u32,
262
263
264
265
		chunks: I,
	) -> io::Result<()>
		where I: IntoIterator<Item = ErasureChunk>
	{
266
267
268
		let candidate_hash = candidate.hash();

		self.add_candidate(candidate).await?;
269

270
271
		let (s, r) = oneshot::channel();
		let chunks = chunks.into_iter().collect();
272

273
274
275
		let msg = WorkerMsg::Chunks(Chunks {
			candidate_hash,
			chunks,
276
			n_validators,
277
278
			result: s,
		});
279

280
		let _ = self.to_worker.unbounded_send(msg);
281

282
283
284
285
286
287
		if let Ok(Ok(())) = r.await {
			Ok(())
		} else {
			Err(io::Error::new(io::ErrorKind::Other, format!("adding erasure chunks failed")))
		}
	}
288

289
	/// Queries an erasure chunk by the candidate hash and validator index.
290
291
	pub fn get_erasure_chunk(
		&self,
292
293
		candidate_hash: &Hash,
		validator_index: usize,
294
	) -> Option<ErasureChunk> {
295
		self.inner.get_erasure_chunk(candidate_hash, validator_index)
296
	}
297

298
299
300
301
302
	/// Note a validator's index and a number of validators at a relay parent in the
	/// store.
	///
	/// This should be done before adding erasure chunks with this relay parent.
	pub fn note_validator_index_and_n_validators(
303
		&self,
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
		relay_parent: &Hash,
		validator_index: u32,
		n_validators: u32,
	) -> io::Result<()> {
		self.inner.note_validator_index_and_n_validators(
			relay_parent,
			validator_index,
			n_validators,
		)
	}

	// Stores a candidate receipt.
	async fn add_candidate(
		&self,
		candidate: AbridgedCandidateReceipt,
319
320
	) -> io::Result<()> {
		let (s, r) = oneshot::channel();
321

322
323
324
325
326
		let msg = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks {
			blocks: vec![crate::worker::IncludedParachainBlock {
				candidate,
				available_data: None,
			}],
327
328
			result: s,
		});
329

330
		let _ = self.to_worker.unbounded_send(msg);
331

332
333
334
335
336
		if let Ok(Ok(())) = r.await {
			Ok(())
		} else {
			Err(io::Error::new(io::ErrorKind::Other, format!("adding erasure chunks failed")))
		}
337
338
	}

339
340
341
342
	/// Queries a candidate receipt by its hash.
	pub fn get_candidate(&self, candidate_hash: &Hash)
		-> Option<AbridgedCandidateReceipt>
	{
343
344
		self.inner.get_candidate(candidate_hash)
	}
345

346
347
348
	/// Query execution data by pov-block hash.
	pub fn execution_data(&self, candidate_hash: &Hash)
		-> Option<ExecutionData>
349
	{
350
		self.inner.execution_data(candidate_hash)
351
	}
352
}