lib.rs 15.6 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
Gav's avatar
Gav committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Collation node logic.
Gav's avatar
Gav committed
18
19
20
21
22
23
24
25
26
27
28
29
30
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
//! (what we might call a witness or block data).
//!
//! One of collators' other roles is to route messages between chains.
//! Each parachain produces a list of "egress" posts of messages for each other
//! parachain on each block, for a total of N^2 lists all together.
//!
//! We will refer to the egress list at relay chain block X of parachain A with
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
31
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
Gav's avatar
Gav committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
//! up to the last point in history that messages were successfully routed
//! from that parachain, accounting for relay chain blocks where no candidate
//! from the collator's parachain was produced.
//!
//! In the case that all parachains route to each other and a candidate for the
//! collator's parachain was included in the last relay chain block, the collator
//! only has to gather egress posts from other parachains one block back in relay
//! chain history.
//!
//! This crate defines traits which provide context necessary for collation logic
//! to be performed, as the collation logic itself.

47
use std::collections::HashSet;
48
use std::fmt;
49
use std::sync::Arc;
50
use std::time::Duration;
Gav's avatar
Gav committed
51

52
use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
53
use log::warn;
54
55
use sc_client::BlockchainEvents;
use sp_core::{Pair, Blake2Hasher};
56
use polkadot_primitives::{
57
	BlockId, Hash, Block,
58
	parachain::{
Ashley's avatar
Ashley committed
59
		self, BlockData, DutyRoster, HeadData, Id as ParaId,
60
		PoVBlock, ValidatorId, CollatorPair, LocalValidationData
61
62
63
	}
};
use polkadot_cli::{
64
	ProvideRuntimeApi, AbstractService, ParachainHost, IsKusama,
65
	service::{self, Roles, SelectChain}
66
};
Ashley's avatar
Ashley committed
67
use polkadot_network::legacy::validation::ValidationNetwork;
68

69
pub use polkadot_cli::{VersionInfo, load_spec, service::Configuration};
70
71
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
72
pub use sc_network::PeerId;
73
pub use service::RuntimeApiCollection;
74

75
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
Gav's avatar
Gav committed
76

77
/// An abstraction over the `Network` with useful functions for a `Collator`.
78
pub trait Network: Send + Sync {
79
80
	/// Convert the given `CollatorId` to a `PeerId`.
	fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
81
		Box<dyn Future<Output=Option<PeerId>> + Send>;
82
83
84
85
86
87

	/// Create a `Stream` of checked statements for the given `relay_parent`.
	///
	/// The returned stream will not terminate, so it is required to make sure that the stream is
	/// dropped when it is not required anymore. Otherwise, it will stick around in memory
	/// infinitely.
88
	fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>>;
89
90
}

Gavin Wood's avatar
Gavin Wood committed
91
impl<P, SP> Network for ValidationNetwork<P, SP> where
92
	P: 'static + Send + Sync,
93
	SP: 'static + Spawn + Clone + Send + Sync,
94
95
{
	fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
96
		Box<dyn Future<Output=Option<PeerId>> + Send>
97
	{
98
		Box::new(Self::collator_id_to_peer_id(self, collator_id))
99
100
	}

101
	fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>> {
102
		Box::new(Self::checked_statements(self, relay_parent))
103
104
105
	}
}

106
107
108
109
110
111
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;

/// Collation errors.
#[derive(Debug)]
Ashley's avatar
Ashley committed
112
pub enum Error {
113
	/// Error on the relay-chain side of things.
Ashley's avatar
Ashley committed
114
	Polkadot(String),
115
116
117
118
	/// Error on the collator side of things.
	Collator(InvalidHead),
}

Ashley's avatar
Ashley committed
119
impl fmt::Display for Error {
120
121
122
123
124
125
126
127
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
			Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
		}
	}
}

128
/// The Polkadot client type.
129
pub type PolkadotClient<B, E, R> = sc_client::Client<B, E, Block, R>;
130

131
132
133
134
135
136
/// Something that can build a `ParachainContext`.
pub trait BuildParachainContext {
	/// The parachain context produced by the `build` function.
	type ParachainContext: self::ParachainContext;

	/// Build the `ParachainContext`.
137
	fn build<B, E, R, SP, Extrinsic>(
138
		self,
139
		client: Arc<PolkadotClient<B, E, R>>,
140
		spawner: SP,
141
142
143
		network: Arc<dyn Network>,
	) -> Result<Self::ParachainContext, ()>
		where
144
			PolkadotClient<B, E, R>: ProvideRuntimeApi<Block>,
145
			<PolkadotClient<B, E, R> as ProvideRuntimeApi<Block>>::Api: RuntimeApiCollection<Extrinsic>,
146
147
148
149
150
			// Rust bug: https://github.com/rust-lang/rust/issues/24159
			<<PolkadotClient<B, E, R> as ProvideRuntimeApi<Block>>::Api as sp_api::ApiExt<Block>>::StateBackend:
				sp_api::StateBackend<Blake2Hasher>,
			Extrinsic: codec::Codec + Send + Sync + 'static,
			E: sc_client::CallExecutor<Block> + Clone + Send + Sync + 'static,
151
152
153
154
155
			SP: Spawn + Clone + Send + Sync + 'static,
			R: Send + Sync + 'static,
			B: sc_client_api::Backend<Block> + 'static,
			// Rust bug: https://github.com/rust-lang/rust/issues/24159
			B::State: sp_api::StateBackend<Blake2Hasher>;
156
157
}

Gav's avatar
Gav committed
158
159
160
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
161
162
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
Ashley's avatar
Ashley committed
163
	type ProduceCandidate: Future<Output = Result<(BlockData, HeadData), InvalidHead>>;
164

165
166
	/// Produce a candidate, given the relay parent hash, the latest ingress queue information
	/// and the last parachain head.
Ashley's avatar
Ashley committed
167
	fn produce_candidate(
168
		&mut self,
169
		relay_parent: Hash,
170
		status: LocalValidationData,
171
	) -> Self::ProduceCandidate;
Gav's avatar
Gav committed
172
173
}

174
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
Ashley's avatar
Ashley committed
175
pub async fn collate<P>(
176
	relay_parent: Hash,
177
	local_id: ParaId,
178
	local_validation_data: LocalValidationData,
179
	mut para_context: P,
180
	key: Arc<CollatorPair>,
181
)
Ashley's avatar
Ashley committed
182
	-> Result<parachain::Collation, Error>
Gav's avatar
Gav committed
183
	where
184
185
		P: ParachainContext,
		P::ProduceCandidate: Send,
Gav's avatar
Gav committed
186
{
Ashley's avatar
Ashley committed
187
	let (block_data, head_data) = para_context.produce_candidate(
188
		relay_parent,
189
		local_validation_data,
190
191
	).map_err(Error::Collator).await?;

192
193
194
195
196
197
198
199
200
201
202
	let pov_block = PoVBlock {
		block_data,
	};

	let pov_block_hash = pov_block.hash();
	let signature = key.sign(&parachain::collator_signature_payload(
		&relay_parent,
		&local_id,
		&pov_block_hash,
	));

203
	let info = parachain::CollationInfo {
204
		parachain_index: local_id,
205
		relay_parent,
206
207
		collator: key.public(),
		signature,
208
		head_data,
209
		pov_block_hash,
210
211
212
	};

	let collation = parachain::Collation {
213
		info,
214
		pov: pov_block,
215
216
	};

Ashley's avatar
Ashley committed
217
	Ok(collation)
218
219
}

220
fn build_collator_service<S, P, Extrinsic>(
221
	service: S,
222
	para_id: ParaId,
223
	key: Arc<CollatorPair>,
224
	build_parachain_context: P,
Gavin Wood's avatar
Gavin Wood committed
225
) -> Result<S, polkadot_service::Error>
226
227
	where
		S: AbstractService<Block = service::Block, NetworkSpecialization = service::PolkadotProtocol>,
228
229
		sc_client::Client<S::Backend, S::CallExecutor, service::Block, S::RuntimeApi>: ProvideRuntimeApi<Block>,
		<sc_client::Client<S::Backend, S::CallExecutor, service::Block, S::RuntimeApi> as ProvideRuntimeApi<Block>>::Api:
230
			RuntimeApiCollection<
231
232
233
234
				Extrinsic,
				Error = sp_blockchain::Error,
				StateBackend = sc_client_api::StateBackendFor<S::Backend, Block>
			>,
235
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
236
		S::Backend: service::Backend<service::Block>,
237
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
238
239
240
241
		<S::Backend as service::Backend<service::Block>>::State:
			sp_api::StateBackend<sp_runtime::traits::HasherFor<Block>>,
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
		S::CallExecutor: service::CallExecutor<service::Block>,
242
243
244
245
246
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
		S::SelectChain: service::SelectChain<service::Block>,
		P: BuildParachainContext,
		P::ParachainContext: Send + 'static,
		<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
247
		Extrinsic: service::Codec + Send + Sync + 'static,
248
{
249
	let spawner = service.spawn_task_handle();
250
251
252
253
254
255
256

	let client = service.client();
	let network = service.network();
	let known_oracle = client.clone();
	let select_chain = if let Some(select_chain) = service.select_chain() {
		select_chain
	} else {
Gavin Wood's avatar
Gavin Wood committed
257
		return Err("The node cannot work because it can't select chain.".into())
258
	};
259

260
261
	let is_known = move |block_hash: &Hash| {
		use consensus_common::BlockStatus;
262
		use polkadot_network::legacy::gossip::Known;
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277

		match known_oracle.block_status(&BlockId::hash(*block_hash)) {
			Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
			Ok(BlockStatus::KnownBad) => Some(Known::Bad),
			Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
				match select_chain.leaves() {
					Err(_) => None,
					Ok(leaves) => if leaves.contains(block_hash) {
						Some(Known::Leaf)
					} else {
						Some(Known::Old)
					},
				}
		}
	};
278

279
	let message_validator = polkadot_network::legacy::gossip::register_validator(
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
		network.clone(),
		(is_known, client.clone()),
		&spawner,
	);

	let validation_network = Arc::new(ValidationNetwork::new(
		message_validator,
		client.clone(),
		spawner.clone(),
	));

	let parachain_context = match build_parachain_context.build(
		client.clone(),
		spawner,
		validation_network.clone(),
	) {
		Ok(ctx) => ctx,
		Err(()) => {
Gavin Wood's avatar
Gavin Wood committed
298
			return Err("Could not build the parachain context!".into())
299
300
		}
	};
301

302
303
304
305
	let work = async move {
		let mut notification_stream = client.import_notification_stream();

		while let Some(notification) = notification_stream.next().await {
306
307
308
309
			macro_rules! try_fr {
				($e:expr) => {
					match $e {
						Ok(x) => x,
310
						Err(e) => return future::Either::Left(future::err(Error::Polkadot(
311
312
							format!("{:?}", e)
						))),
313
					}
314
				}
315
			}
316

317
318
319
320
321
322
323
324
			let relay_parent = notification.hash;
			let id = BlockId::hash(relay_parent);

			let network = network.clone();
			let client = client.clone();
			let key = key.clone();
			let parachain_context = parachain_context.clone();

325
			let work = future::lazy(move |_| {
326
				let api = client.runtime_api();
327
328
				let local_validation = match try_fr!(api.local_validation_data(&id, para_id)) {
					Some(local_validation) => local_validation,
329
					None => return future::Either::Left(future::ok(())),
330
331
332
333
334
335
336
337
338
339
				};

				let validators = try_fr!(api.validators(&id));

				let targets = compute_targets(
					para_id,
					validators.as_slice(),
					try_fr!(api.duty_roster(&id)),
				);

340
				let collation_work = collate(
341
342
					relay_parent,
					para_id,
343
					local_validation,
344
345
					parachain_context,
					key,
Ashley's avatar
Ashley committed
346
				).map_ok(move |collation| {
347
					network.with_spec(move |spec, ctx| {
348
						spec.add_local_collation(
349
350
351
352
353
							ctx,
							relay_parent,
							targets,
							collation,
						);
354
355
356
357
					})
				});

				future::Either::Right(collation_work)
358
			});
359

360
			let deadlined = future::select(
361
				work.then(|f| f).boxed(),
362
363
364
365
366
367
368
369
370
				futures_timer::Delay::new(COLLATION_TIMEOUT)
			);

			let silenced = deadlined
				.map(|either| {
					if let future::Either::Right(_) = either {
						warn!("Collation failure: timeout");
					}
				});
371

372
			let future = silenced.map(drop);
373

374
			tokio::spawn(future);
375
376
		}
	}.boxed();
377

Gavin Wood's avatar
Gavin Wood committed
378
	service.spawn_essential_task("collation", work);
379

Gavin Wood's avatar
Gavin Wood committed
380
	Ok(service)
381
382
}

383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
/// Async function that will run the collator node with the given `RelayChainContext` and `ParachainContext`
/// built by the given `BuildParachainContext` and arguments to the underlying polkadot node.
pub async fn start_collator<P>(
	build_parachain_context: P,
	para_id: ParaId,
	key: Arc<CollatorPair>,
	config: Configuration,
) -> Result<(), polkadot_service::Error>
where
	P: BuildParachainContext,
	P::ParachainContext: Send + 'static,
	<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
{
	match (config.expect_chain_spec().is_kusama(), config.roles) {
		(true, Roles::LIGHT) =>
			build_collator_service(
				service::kusama_new_light(config, Some((key.public(), para_id)))?,
				para_id,
				key,
				build_parachain_context,
			)?.await,
		(true, _) =>
			build_collator_service(
				service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
				para_id,
				key,
				build_parachain_context,
			)?.await,
		(false, Roles::LIGHT) =>
			build_collator_service(
				service::polkadot_new_light(config, Some((key.public(), para_id)))?,
				para_id,
				key,
				build_parachain_context,
			)?.await,
		(false, _) =>
			build_collator_service(
				service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
				para_id,
				key,
				build_parachain_context,
			)?.await,
	}
}

428
fn compute_targets(para_id: ParaId, session_keys: &[ValidatorId], roster: DutyRoster) -> HashSet<ValidatorId> {
429
430
431
432
433
434
435
436
437
	use polkadot_primitives::parachain::Chain;

	roster.validator_duty.iter().enumerate()
		.filter(|&(_, c)| c == &Chain::Parachain(para_id))
		.filter_map(|(i, _)| session_keys.get(i))
		.cloned()
		.collect()
}

438
/// Run a collator node with the given `RelayChainContext` and `ParachainContext`
439
/// built by the given `BuildParachainContext` and arguments to the underlying polkadot node.
440
441
///
/// This function blocks until done.
Gavin Wood's avatar
Gavin Wood committed
442
pub fn run_collator<P>(
443
	build_parachain_context: P,
444
	para_id: ParaId,
445
	key: Arc<CollatorPair>,
Gavin Wood's avatar
Gavin Wood committed
446
	config: Configuration,
447
) -> polkadot_cli::Result<()> where
448
	P: BuildParachainContext,
449
	P::ParachainContext: Send + 'static,
450
	<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
451
{
Gavin Wood's avatar
Gavin Wood committed
452
	match (config.expect_chain_spec().is_kusama(), config.roles) {
453
		(true, Roles::LIGHT) =>
Gavin Wood's avatar
Gavin Wood committed
454
			sc_cli::run_service_until_exit(config, |config| {
455
				build_collator_service(
Gavin Wood's avatar
Gavin Wood committed
456
457
458
459
460
461
					service::kusama_new_light(config, Some((key.public(), para_id)))?,
					para_id,
					key,
					build_parachain_context,
				)
			}),
462
		(true, _) =>
Gavin Wood's avatar
Gavin Wood committed
463
			sc_cli::run_service_until_exit(config, |config| {
464
				build_collator_service(
Gavin Wood's avatar
Gavin Wood committed
465
466
467
468
469
470
					service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
					para_id,
					key,
					build_parachain_context,
				)
			}),
471
		(false, Roles::LIGHT) =>
Gavin Wood's avatar
Gavin Wood committed
472
			sc_cli::run_service_until_exit(config, |config| {
473
				build_collator_service(
Gavin Wood's avatar
Gavin Wood committed
474
475
476
477
478
479
					service::polkadot_new_light(config, Some((key.public(), para_id)))?,
					para_id,
					key,
					build_parachain_context,
				)
			}),
480
		(false, _) =>
Gavin Wood's avatar
Gavin Wood committed
481
			sc_cli::run_service_until_exit(config, |config| {
482
				build_collator_service(
Gavin Wood's avatar
Gavin Wood committed
483
484
485
486
487
488
					service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
					para_id,
					key,
					build_parachain_context,
				)
			}),
489
	}
490
491
}

Gav's avatar
Gav committed
492
493
494
495
#[cfg(test)]
mod tests {
	use super::*;

496
497
498
499
	#[derive(Clone)]
	struct DummyParachainContext;

	impl ParachainContext for DummyParachainContext {
Ashley's avatar
Ashley committed
500
		type ProduceCandidate = future::Ready<Result<(BlockData, HeadData), InvalidHead>>;
501

Ashley's avatar
Ashley committed
502
		fn produce_candidate(
503
			&mut self,
504
			_relay_parent: Hash,
505
			_local_validation: LocalValidationData,
506
		) -> Self::ProduceCandidate {
507
			// send messages right back.
508
			future::ok((
509
510
511
				BlockData(vec![1, 2, 3, 4, 5,]),
				HeadData(vec![9, 9, 9]),
			))
Gav's avatar
Gav committed
512
513
514
		}
	}
}