lib.rs 15.6 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
Gav's avatar
Gav committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Collation node logic.
Gav's avatar
Gav committed
18
19
20
21
22
23
24
25
26
27
28
29
30
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
//! (what we might call a witness or block data).
//!
//! One of collators' other roles is to route messages between chains.
//! Each parachain produces a list of "egress" posts of messages for each other
//! parachain on each block, for a total of N^2 lists all together.
//!
//! We will refer to the egress list at relay chain block X of parachain A with
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
31
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
Gav's avatar
Gav committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
//! up to the last point in history that messages were successfully routed
//! from that parachain, accounting for relay chain blocks where no candidate
//! from the collator's parachain was produced.
//!
//! In the case that all parachains route to each other and a candidate for the
//! collator's parachain was included in the last relay chain block, the collator
//! only has to gather egress posts from other parachains one block back in relay
//! chain history.
//!
//! This crate defines traits which provide context necessary for collation logic
//! to be performed, as the collation logic itself.

47
use std::collections::HashSet;
48
use std::fmt;
49
use std::sync::Arc;
50
use std::time::Duration;
Gav's avatar
Gav committed
51

52
use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
53
use log::warn;
54
use sc_client::BlockchainEvents;
Gavin Wood's avatar
Gavin Wood committed
55
56
use sp_core::Pair;
use sp_runtime::traits::BlakeTwo256;
57
use polkadot_primitives::{
58
	BlockId, Hash, Block,
59
	parachain::{
Ashley's avatar
Ashley committed
60
		self, BlockData, DutyRoster, HeadData, Id as ParaId,
61
		PoVBlock, ValidatorId, CollatorPair, LocalValidationData
62
63
64
	}
};
use polkadot_cli::{
65
	ProvideRuntimeApi, AbstractService, ParachainHost, IsKusama,
66
	service::{self, Roles, SelectChain}
67
};
Ashley's avatar
Ashley committed
68
use polkadot_network::legacy::validation::ValidationNetwork;
69

70
pub use polkadot_cli::{VersionInfo, load_spec, service::Configuration};
71
72
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
73
pub use sc_network::PeerId;
74
pub use service::RuntimeApiCollection;
75

76
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
Gav's avatar
Gav committed
77

78
/// An abstraction over the `Network` with useful functions for a `Collator`.
79
pub trait Network: Send + Sync {
80
81
	/// Convert the given `CollatorId` to a `PeerId`.
	fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
82
		Box<dyn Future<Output=Option<PeerId>> + Send>;
83
84
85
86
87
88

	/// Create a `Stream` of checked statements for the given `relay_parent`.
	///
	/// The returned stream will not terminate, so it is required to make sure that the stream is
	/// dropped when it is not required anymore. Otherwise, it will stick around in memory
	/// infinitely.
89
	fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>>;
90
91
}

Gavin Wood's avatar
Gavin Wood committed
92
impl<P, SP> Network for ValidationNetwork<P, SP> where
93
	P: 'static + Send + Sync,
94
	SP: 'static + Spawn + Clone + Send + Sync,
95
96
{
	fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
97
		Box<dyn Future<Output=Option<PeerId>> + Send>
98
	{
99
		Box::new(Self::collator_id_to_peer_id(self, collator_id))
100
101
	}

102
	fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>> {
103
		Box::new(Self::checked_statements(self, relay_parent))
104
105
106
	}
}

107
108
109
110
111
112
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;

/// Collation errors.
#[derive(Debug)]
Ashley's avatar
Ashley committed
113
pub enum Error {
114
	/// Error on the relay-chain side of things.
Ashley's avatar
Ashley committed
115
	Polkadot(String),
116
117
118
119
	/// Error on the collator side of things.
	Collator(InvalidHead),
}

Ashley's avatar
Ashley committed
120
impl fmt::Display for Error {
121
122
123
124
125
126
127
128
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
			Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
		}
	}
}

129
/// The Polkadot client type.
130
pub type PolkadotClient<B, E, R> = sc_client::Client<B, E, Block, R>;
131

132
133
134
135
136
137
/// Something that can build a `ParachainContext`.
pub trait BuildParachainContext {
	/// The parachain context produced by the `build` function.
	type ParachainContext: self::ParachainContext;

	/// Build the `ParachainContext`.
138
	fn build<B, E, R, SP, Extrinsic>(
139
		self,
140
		client: Arc<PolkadotClient<B, E, R>>,
141
		spawner: SP,
142
143
144
		network: Arc<dyn Network>,
	) -> Result<Self::ParachainContext, ()>
		where
145
			PolkadotClient<B, E, R>: ProvideRuntimeApi<Block>,
146
			<PolkadotClient<B, E, R> as ProvideRuntimeApi<Block>>::Api: RuntimeApiCollection<Extrinsic>,
147
148
			// Rust bug: https://github.com/rust-lang/rust/issues/24159
			<<PolkadotClient<B, E, R> as ProvideRuntimeApi<Block>>::Api as sp_api::ApiExt<Block>>::StateBackend:
Gavin Wood's avatar
Gavin Wood committed
149
				sp_api::StateBackend<BlakeTwo256>,
150
151
			Extrinsic: codec::Codec + Send + Sync + 'static,
			E: sc_client::CallExecutor<Block> + Clone + Send + Sync + 'static,
152
153
154
155
			SP: Spawn + Clone + Send + Sync + 'static,
			R: Send + Sync + 'static,
			B: sc_client_api::Backend<Block> + 'static,
			// Rust bug: https://github.com/rust-lang/rust/issues/24159
Gavin Wood's avatar
Gavin Wood committed
156
			B::State: sp_api::StateBackend<BlakeTwo256>;
157
158
}

Gav's avatar
Gav committed
159
160
161
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
162
163
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
Ashley's avatar
Ashley committed
164
	type ProduceCandidate: Future<Output = Result<(BlockData, HeadData), InvalidHead>>;
165

166
167
	/// Produce a candidate, given the relay parent hash, the latest ingress queue information
	/// and the last parachain head.
Ashley's avatar
Ashley committed
168
	fn produce_candidate(
169
		&mut self,
170
		relay_parent: Hash,
171
		status: LocalValidationData,
172
	) -> Self::ProduceCandidate;
Gav's avatar
Gav committed
173
174
}

175
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
Ashley's avatar
Ashley committed
176
pub async fn collate<P>(
177
	relay_parent: Hash,
178
	local_id: ParaId,
179
	local_validation_data: LocalValidationData,
180
	mut para_context: P,
181
	key: Arc<CollatorPair>,
182
)
Ashley's avatar
Ashley committed
183
	-> Result<parachain::Collation, Error>
Gav's avatar
Gav committed
184
	where
185
186
		P: ParachainContext,
		P::ProduceCandidate: Send,
Gav's avatar
Gav committed
187
{
Ashley's avatar
Ashley committed
188
	let (block_data, head_data) = para_context.produce_candidate(
189
		relay_parent,
190
		local_validation_data,
191
192
	).map_err(Error::Collator).await?;

193
194
195
196
197
198
199
200
201
202
203
	let pov_block = PoVBlock {
		block_data,
	};

	let pov_block_hash = pov_block.hash();
	let signature = key.sign(&parachain::collator_signature_payload(
		&relay_parent,
		&local_id,
		&pov_block_hash,
	));

204
	let info = parachain::CollationInfo {
205
		parachain_index: local_id,
206
		relay_parent,
207
208
		collator: key.public(),
		signature,
209
		head_data,
210
		pov_block_hash,
211
212
213
	};

	let collation = parachain::Collation {
214
		info,
215
		pov: pov_block,
216
217
	};

Ashley's avatar
Ashley committed
218
	Ok(collation)
219
220
}

221
fn build_collator_service<S, P, Extrinsic>(
222
	service: S,
223
	para_id: ParaId,
224
	key: Arc<CollatorPair>,
225
	build_parachain_context: P,
Gavin Wood's avatar
Gavin Wood committed
226
) -> Result<S, polkadot_service::Error>
227
228
	where
		S: AbstractService<Block = service::Block, NetworkSpecialization = service::PolkadotProtocol>,
229
230
		sc_client::Client<S::Backend, S::CallExecutor, service::Block, S::RuntimeApi>: ProvideRuntimeApi<Block>,
		<sc_client::Client<S::Backend, S::CallExecutor, service::Block, S::RuntimeApi> as ProvideRuntimeApi<Block>>::Api:
231
			RuntimeApiCollection<
232
233
234
235
				Extrinsic,
				Error = sp_blockchain::Error,
				StateBackend = sc_client_api::StateBackendFor<S::Backend, Block>
			>,
236
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
237
		S::Backend: service::Backend<service::Block>,
238
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
239
		<S::Backend as service::Backend<service::Block>>::State:
Gavin Wood's avatar
Gavin Wood committed
240
			sp_api::StateBackend<sp_runtime::traits::HashFor<Block>>,
241
242
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
		S::CallExecutor: service::CallExecutor<service::Block>,
243
244
245
246
247
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
		S::SelectChain: service::SelectChain<service::Block>,
		P: BuildParachainContext,
		P::ParachainContext: Send + 'static,
		<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
248
		Extrinsic: service::Codec + Send + Sync + 'static,
249
{
250
	let spawner = service.spawn_task_handle();
251
252
253
254
255
256
257

	let client = service.client();
	let network = service.network();
	let known_oracle = client.clone();
	let select_chain = if let Some(select_chain) = service.select_chain() {
		select_chain
	} else {
Gavin Wood's avatar
Gavin Wood committed
258
		return Err("The node cannot work because it can't select chain.".into())
259
	};
260

261
262
	let is_known = move |block_hash: &Hash| {
		use consensus_common::BlockStatus;
263
		use polkadot_network::legacy::gossip::Known;
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278

		match known_oracle.block_status(&BlockId::hash(*block_hash)) {
			Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
			Ok(BlockStatus::KnownBad) => Some(Known::Bad),
			Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
				match select_chain.leaves() {
					Err(_) => None,
					Ok(leaves) => if leaves.contains(block_hash) {
						Some(Known::Leaf)
					} else {
						Some(Known::Old)
					},
				}
		}
	};
279

280
	let message_validator = polkadot_network::legacy::gossip::register_validator(
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
		network.clone(),
		(is_known, client.clone()),
		&spawner,
	);

	let validation_network = Arc::new(ValidationNetwork::new(
		message_validator,
		client.clone(),
		spawner.clone(),
	));

	let parachain_context = match build_parachain_context.build(
		client.clone(),
		spawner,
		validation_network.clone(),
	) {
		Ok(ctx) => ctx,
		Err(()) => {
Gavin Wood's avatar
Gavin Wood committed
299
			return Err("Could not build the parachain context!".into())
300
301
		}
	};
302

303
304
305
306
	let work = async move {
		let mut notification_stream = client.import_notification_stream();

		while let Some(notification) = notification_stream.next().await {
307
308
309
310
			macro_rules! try_fr {
				($e:expr) => {
					match $e {
						Ok(x) => x,
311
						Err(e) => return future::Either::Left(future::err(Error::Polkadot(
312
313
							format!("{:?}", e)
						))),
314
					}
315
				}
316
			}
317

318
319
320
321
322
323
324
325
			let relay_parent = notification.hash;
			let id = BlockId::hash(relay_parent);

			let network = network.clone();
			let client = client.clone();
			let key = key.clone();
			let parachain_context = parachain_context.clone();

326
			let work = future::lazy(move |_| {
327
				let api = client.runtime_api();
328
329
				let local_validation = match try_fr!(api.local_validation_data(&id, para_id)) {
					Some(local_validation) => local_validation,
330
					None => return future::Either::Left(future::ok(())),
331
332
333
334
335
336
337
338
339
340
				};

				let validators = try_fr!(api.validators(&id));

				let targets = compute_targets(
					para_id,
					validators.as_slice(),
					try_fr!(api.duty_roster(&id)),
				);

341
				let collation_work = collate(
342
343
					relay_parent,
					para_id,
344
					local_validation,
345
346
					parachain_context,
					key,
Ashley's avatar
Ashley committed
347
				).map_ok(move |collation| {
348
					network.with_spec(move |spec, ctx| {
349
						spec.add_local_collation(
350
351
352
353
354
							ctx,
							relay_parent,
							targets,
							collation,
						);
355
356
357
358
					})
				});

				future::Either::Right(collation_work)
359
			});
360

361
			let deadlined = future::select(
362
				work.then(|f| f).boxed(),
363
364
365
366
367
368
369
370
371
				futures_timer::Delay::new(COLLATION_TIMEOUT)
			);

			let silenced = deadlined
				.map(|either| {
					if let future::Either::Right(_) = either {
						warn!("Collation failure: timeout");
					}
				});
372

373
			let future = silenced.map(drop);
374

375
			tokio::spawn(future);
376
377
		}
	}.boxed();
378

Gavin Wood's avatar
Gavin Wood committed
379
	service.spawn_essential_task("collation", work);
380

Gavin Wood's avatar
Gavin Wood committed
381
	Ok(service)
382
383
}

384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
/// Async function that will run the collator node with the given `RelayChainContext` and `ParachainContext`
/// built by the given `BuildParachainContext` and arguments to the underlying polkadot node.
pub async fn start_collator<P>(
	build_parachain_context: P,
	para_id: ParaId,
	key: Arc<CollatorPair>,
	config: Configuration,
) -> Result<(), polkadot_service::Error>
where
	P: BuildParachainContext,
	P::ParachainContext: Send + 'static,
	<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
{
	match (config.expect_chain_spec().is_kusama(), config.roles) {
		(true, Roles::LIGHT) =>
			build_collator_service(
				service::kusama_new_light(config, Some((key.public(), para_id)))?,
				para_id,
				key,
				build_parachain_context,
			)?.await,
		(true, _) =>
			build_collator_service(
				service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
				para_id,
				key,
				build_parachain_context,
			)?.await,
		(false, Roles::LIGHT) =>
			build_collator_service(
				service::polkadot_new_light(config, Some((key.public(), para_id)))?,
				para_id,
				key,
				build_parachain_context,
			)?.await,
		(false, _) =>
			build_collator_service(
				service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
				para_id,
				key,
				build_parachain_context,
			)?.await,
	}
}

429
fn compute_targets(para_id: ParaId, session_keys: &[ValidatorId], roster: DutyRoster) -> HashSet<ValidatorId> {
430
431
432
433
434
435
436
437
438
	use polkadot_primitives::parachain::Chain;

	roster.validator_duty.iter().enumerate()
		.filter(|&(_, c)| c == &Chain::Parachain(para_id))
		.filter_map(|(i, _)| session_keys.get(i))
		.cloned()
		.collect()
}

439
/// Run a collator node with the given `RelayChainContext` and `ParachainContext`
440
/// built by the given `BuildParachainContext` and arguments to the underlying polkadot node.
441
442
///
/// This function blocks until done.
Gavin Wood's avatar
Gavin Wood committed
443
pub fn run_collator<P>(
444
	build_parachain_context: P,
445
	para_id: ParaId,
446
	key: Arc<CollatorPair>,
Gavin Wood's avatar
Gavin Wood committed
447
	config: Configuration,
448
) -> polkadot_cli::Result<()> where
449
	P: BuildParachainContext,
450
	P::ParachainContext: Send + 'static,
451
	<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
452
{
Gavin Wood's avatar
Gavin Wood committed
453
	match (config.expect_chain_spec().is_kusama(), config.roles) {
454
		(true, Roles::LIGHT) =>
Gavin Wood's avatar
Gavin Wood committed
455
			sc_cli::run_service_until_exit(config, |config| {
456
				build_collator_service(
Gavin Wood's avatar
Gavin Wood committed
457
458
459
460
461
462
					service::kusama_new_light(config, Some((key.public(), para_id)))?,
					para_id,
					key,
					build_parachain_context,
				)
			}),
463
		(true, _) =>
Gavin Wood's avatar
Gavin Wood committed
464
			sc_cli::run_service_until_exit(config, |config| {
465
				build_collator_service(
Gavin Wood's avatar
Gavin Wood committed
466
467
468
469
470
471
					service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
					para_id,
					key,
					build_parachain_context,
				)
			}),
472
		(false, Roles::LIGHT) =>
Gavin Wood's avatar
Gavin Wood committed
473
			sc_cli::run_service_until_exit(config, |config| {
474
				build_collator_service(
Gavin Wood's avatar
Gavin Wood committed
475
476
477
478
479
480
					service::polkadot_new_light(config, Some((key.public(), para_id)))?,
					para_id,
					key,
					build_parachain_context,
				)
			}),
481
		(false, _) =>
Gavin Wood's avatar
Gavin Wood committed
482
			sc_cli::run_service_until_exit(config, |config| {
483
				build_collator_service(
Gavin Wood's avatar
Gavin Wood committed
484
485
486
487
488
489
					service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
					para_id,
					key,
					build_parachain_context,
				)
			}),
490
	}
491
492
}

Gav's avatar
Gav committed
493
494
495
496
#[cfg(test)]
mod tests {
	use super::*;

497
498
499
500
	#[derive(Clone)]
	struct DummyParachainContext;

	impl ParachainContext for DummyParachainContext {
Ashley's avatar
Ashley committed
501
		type ProduceCandidate = future::Ready<Result<(BlockData, HeadData), InvalidHead>>;
502

Ashley's avatar
Ashley committed
503
		fn produce_candidate(
504
			&mut self,
505
			_relay_parent: Hash,
506
			_local_validation: LocalValidationData,
507
		) -> Self::ProduceCandidate {
508
			// send messages right back.
509
			future::ok((
510
511
512
				BlockData(vec![1, 2, 3, 4, 5,]),
				HeadData(vec![9, 9, 9]),
			))
Gav's avatar
Gav committed
513
514
515
		}
	}
}