lib.rs 15.3 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
Gav's avatar
Gav committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Collation node logic.
Gav's avatar
Gav committed
18
19
20
21
22
23
24
25
26
27
28
29
30
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
//! (what we might call a witness or block data).
//!
//! One of collators' other roles is to route messages between chains.
//! Each parachain produces a list of "egress" posts of messages for each other
//! parachain on each block, for a total of N^2 lists all together.
//!
//! We will refer to the egress list at relay chain block X of parachain A with
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
31
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
Gav's avatar
Gav committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
//! up to the last point in history that messages were successfully routed
//! from that parachain, accounting for relay chain blocks where no candidate
//! from the collator's parachain was produced.
//!
//! In the case that all parachains route to each other and a candidate for the
//! collator's parachain was included in the last relay chain block, the collator
//! only has to gather egress posts from other parachains one block back in relay
//! chain history.
//!
//! This crate defines traits which provide context necessary for collation logic
//! to be performed, as the collation logic itself.

47
use std::collections::HashSet;
48
use std::fmt;
49
use std::sync::Arc;
50
use std::time::Duration;
51
use std::pin::Pin;
Gav's avatar
Gav committed
52

53
use futures::{future, Future, Stream, FutureExt, StreamExt};
54
use log::warn;
55
use sc_client_api::{StateBackend, BlockchainEvents};
56
use sp_blockchain::HeaderBackend;
Gavin Wood's avatar
Gavin Wood committed
57
use sp_core::Pair;
asynchronous rob's avatar
asynchronous rob committed
58
use polkadot_primitives::v0::{
59
	BlockId, Hash, Block, DownwardMessage,
asynchronous rob's avatar
asynchronous rob committed
60
61
62
	BlockData, DutyRoster, HeadData, Id as ParaId,
	PoVBlock, ValidatorId, CollatorPair, LocalValidationData, GlobalValidationSchedule,
	Collation, CollationInfo, collator_signature_payload,
63
64
};
use polkadot_cli::{
65
	ProvideRuntimeApi, ParachainHost, IdentifyVariant,
66
	service::{self, Role}
67
};
68
69
pub use polkadot_cli::service::Configuration;
pub use polkadot_cli::Cli;
70
pub use polkadot_validation::SignedStatement;
asynchronous rob's avatar
asynchronous rob committed
71
pub use polkadot_primitives::v0::CollatorId;
72
pub use sc_network::PeerId;
73
pub use service::RuntimeApiCollection;
74
pub use sc_cli::SubstrateCli;
Seun Lanlege's avatar
Seun Lanlege committed
75
use sp_api::{ConstructRuntimeApi, ApiExt, HashFor};
76
77
78
79
80
81
82
#[cfg(not(feature = "service-rewr"))]
use polkadot_service::{FullNodeHandles, PolkadotClient};
#[cfg(feature = "service-rewr")]
use polkadot_service_new::{
	self as polkadot_service,
	Error as ServiceError, FullNodeHandles, PolkadotClient,
};
83
use sc_service::SpawnTaskHandle;
84
use sp_core::traits::SpawnNamed;
85

86
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
Gav's avatar
Gav committed
87

88
/// An abstraction over the `Network` with useful functions for a `Collator`.
89
pub trait Network: Send + Sync {
90
91
92
93
94
	/// Create a `Stream` of checked statements for the given `relay_parent`.
	///
	/// The returned stream will not terminate, so it is required to make sure that the stream is
	/// dropped when it is not required anymore. Otherwise, it will stick around in memory
	/// infinitely.
95
	fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement> + Send>>;
96
97
}

98
impl Network for polkadot_network::protocol::Service {
99
100
	fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement> + Send>> {
		polkadot_network::protocol::Service::checked_statements(self, relay_parent).boxed()
101
102
103
	}
}

104
105
/// Collation errors.
#[derive(Debug)]
Ashley's avatar
Ashley committed
106
pub enum Error {
107
	/// Error on the relay-chain side of things.
Ashley's avatar
Ashley committed
108
	Polkadot(String),
109
110
}

Ashley's avatar
Ashley committed
111
impl fmt::Display for Error {
112
113
114
115
116
117
118
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
		}
	}
}

119
120
121
122
123
124
/// Something that can build a `ParachainContext`.
pub trait BuildParachainContext {
	/// The parachain context produced by the `build` function.
	type ParachainContext: self::ParachainContext;

	/// Build the `ParachainContext`.
Seun Lanlege's avatar
Seun Lanlege committed
125
	fn build<Client, SP, Extrinsic>(
126
		self,
Seun Lanlege's avatar
Seun Lanlege committed
127
		client: Arc<Client>,
128
		spawner: SP,
129
		network: impl Network + Clone + 'static,
130
131
	) -> Result<Self::ParachainContext, ()>
		where
132
			Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + Send + Sync + 'static,
Seun Lanlege's avatar
Seun Lanlege committed
133
134
			Client::Api: RuntimeApiCollection<Extrinsic>,
			<Client::Api as ApiExt<Block>>::StateBackend: StateBackend<HashFor<Block>>,
135
			Extrinsic: codec::Codec + Send + Sync + 'static,
136
			SP: SpawnNamed + Clone + Send + Sync + 'static;
137
138
}

Gav's avatar
Gav committed
139
140
141
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
142
143
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
144
	type ProduceCandidate: Future<Output = Option<(BlockData, HeadData)>>;
145

146
147
	/// Produce a candidate, given the relay parent hash, the latest ingress queue information
	/// and the last parachain head.
Ashley's avatar
Ashley committed
148
	fn produce_candidate(
149
		&mut self,
150
		relay_parent: Hash,
151
152
		global_validation: GlobalValidationSchedule,
		local_validation: LocalValidationData,
153
		downward_messages: Vec<DownwardMessage>,
154
	) -> Self::ProduceCandidate;
Gav's avatar
Gav committed
155
156
}

157
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
Ashley's avatar
Ashley committed
158
pub async fn collate<P>(
159
	relay_parent: Hash,
160
	local_id: ParaId,
161
	global_validation: GlobalValidationSchedule,
162
	local_validation_data: LocalValidationData,
163
	downward_messages: Vec<DownwardMessage>,
164
	mut para_context: P,
165
	key: Arc<CollatorPair>,
asynchronous rob's avatar
asynchronous rob committed
166
) -> Option<Collation>
Gav's avatar
Gav committed
167
	where
168
169
		P: ParachainContext,
		P::ProduceCandidate: Send,
Gav's avatar
Gav committed
170
{
Ashley's avatar
Ashley committed
171
	let (block_data, head_data) = para_context.produce_candidate(
172
		relay_parent,
173
		global_validation,
174
		local_validation_data,
175
		downward_messages,
176
	).await?;
177

178
179
180
181
182
	let pov_block = PoVBlock {
		block_data,
	};

	let pov_block_hash = pov_block.hash();
asynchronous rob's avatar
asynchronous rob committed
183
	let signature = key.sign(&collator_signature_payload(
184
185
186
187
188
		&relay_parent,
		&local_id,
		&pov_block_hash,
	));

asynchronous rob's avatar
asynchronous rob committed
189
	let info = CollationInfo {
190
		parachain_index: local_id,
191
		relay_parent,
192
193
		collator: key.public(),
		signature,
194
		head_data,
195
		pov_block_hash,
196
197
	};

asynchronous rob's avatar
asynchronous rob committed
198
	let collation = Collation {
199
		info,
200
		pov: pov_block,
201
202
	};

203
	Some(collation)
204
205
}

206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#[cfg(feature = "service-rewr")]
fn build_collator_service<SP, P, C, R, Extrinsic>(
	_spawner: SP,
	_handles: FullNodeHandles,
	_client: Arc<C>,
	_para_id: ParaId,
	_key: Arc<CollatorPair>,
	_build_parachain_context: P,
) -> Result<future::Ready<()>, polkadot_service::Error>
	where
		C: PolkadotClient<
			service::Block,
			service::TFullBackend<service::Block>,
			R
		> + 'static,
		R: ConstructRuntimeApi<service::Block, C> + Sync + Send,
		<R as ConstructRuntimeApi<service::Block, C>>::RuntimeApi:
			sp_api::ApiExt<
				service::Block,
				StateBackend = <service::TFullBackend<service::Block> as service::Backend<service::Block>>::State,
			>
			+ RuntimeApiCollection<
				Extrinsic,
				StateBackend = <service::TFullBackend<service::Block> as service::Backend<service::Block>>::State,
			>
			+ Sync + Send,
		P: BuildParachainContext,
		P::ParachainContext: Send + 'static,
		<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
		Extrinsic: service::Codec + Send + Sync + 'static,
236
		SP: SpawnNamed + Clone + Send + Sync + 'static,
237
238
239
240
241
242
{
	Err("Collator is not functional with the new service yet".into())
}


#[cfg(not(feature = "service-rewr"))]
243
244
fn build_collator_service<P, C, R, Extrinsic>(
	spawner: SpawnTaskHandle,
245
	handles: FullNodeHandles,
Seun Lanlege's avatar
Seun Lanlege committed
246
	client: Arc<C>,
247
	para_id: ParaId,
248
	key: Arc<CollatorPair>,
249
	build_parachain_context: P,
Seun Lanlege's avatar
Seun Lanlege committed
250
) -> Result<impl Future<Output = ()> + Send + 'static, polkadot_service::Error>
251
	where
252
		C: PolkadotClient<
Seun Lanlege's avatar
Seun Lanlege committed
253
254
255
256
257
258
259
260
261
262
263
			service::Block,
			service::TFullBackend<service::Block>,
			R
		> + 'static,
		R: ConstructRuntimeApi<service::Block, C> + Sync + Send,
		<R as ConstructRuntimeApi<service::Block, C>>::RuntimeApi:
			sp_api::ApiExt<
				service::Block,
				StateBackend = <service::TFullBackend<service::Block> as service::Backend<service::Block>>::State,
			>
			+ RuntimeApiCollection<
264
				Extrinsic,
Seun Lanlege's avatar
Seun Lanlege committed
265
266
267
				StateBackend = <service::TFullBackend<service::Block> as service::Backend<service::Block>>::State,
			>
			+ Sync + Send,
268
269
270
		P: BuildParachainContext,
		P::ParachainContext: Send + 'static,
		<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
271
		Extrinsic: service::Codec + Send + Sync + 'static,
272
{
273
274
275
276
277
278
279
280
	let polkadot_network = handles.polkadot_network
		.ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?;

	// We don't require this here, but we need to make sure that the validation service is started.
	// This service makes sure the collator is joining the correct gossip topics and receives the appropiate
	// messages.
	handles.validation_service_handle
		.ok_or_else(|| "Collator cannot run when validation networking has not been started")?;
281

282
283
	let parachain_context = match build_parachain_context.build(
		client.clone(),
284
		spawner.clone(),
285
		polkadot_network.clone(),
286
287
288
	) {
		Ok(ctx) => ctx,
		Err(()) => {
Gavin Wood's avatar
Gavin Wood committed
289
			return Err("Could not build the parachain context!".into())
290
291
		}
	};
292

293
294
295
296
	let work = async move {
		let mut notification_stream = client.import_notification_stream();

		while let Some(notification) = notification_stream.next().await {
297
298
299
300
			macro_rules! try_fr {
				($e:expr) => {
					match $e {
						Ok(x) => x,
301
						Err(e) => return future::Either::Left(future::err(Error::Polkadot(
302
303
							format!("{:?}", e)
						))),
304
					}
305
				}
306
			}
307

308
309
310
			let relay_parent = notification.hash;
			let id = BlockId::hash(relay_parent);

311
			let network = polkadot_network.clone();
312
313
314
315
			let client = client.clone();
			let key = key.clone();
			let parachain_context = parachain_context.clone();

316
			let work = future::lazy(move |_| {
317
				let api = client.runtime_api();
318
				let global_validation = try_fr!(api.global_validation_schedule(&id));
319
320
				let local_validation = match try_fr!(api.local_validation_data(&id, para_id)) {
					Some(local_validation) => local_validation,
321
					None => return future::Either::Left(future::ok(())),
322
				};
323
				let downward_messages = try_fr!(api.downward_messages(&id, para_id));
324
325
326
327
328
329
330
331
332

				let validators = try_fr!(api.validators(&id));

				let targets = compute_targets(
					para_id,
					validators.as_slice(),
					try_fr!(api.duty_roster(&id)),
				);

333
				let collation_work = collate(
334
335
					relay_parent,
					para_id,
336
					global_validation,
337
					local_validation,
338
					downward_messages,
339
340
					parachain_context,
					key,
341
342
343
344
345
346
347
				).map(move |collation| {
					match collation {
						Some(collation) => network.distribute_collation(targets, collation),
						None => log::trace!("Skipping collation as `collate` returned `None`"),
					}

					Ok(())
348
349
350
				});

				future::Either::Right(collation_work)
351
			});
352

353
			let deadlined = future::select(
354
				work.then(|f| f).boxed(),
355
356
357
358
359
360
361
362
363
				futures_timer::Delay::new(COLLATION_TIMEOUT)
			);

			let silenced = deadlined
				.map(|either| {
					if let future::Either::Right(_) = either {
						warn!("Collation failure: timeout");
					}
				});
364

365
			let future = silenced.map(drop);
366

367
			spawner.spawn("collation-work", future);
368
369
		}
	}.boxed();
370

Seun Lanlege's avatar
Seun Lanlege committed
371
	Ok(work)
372
373
}

374
375
/// Async function that will run the collator node with the given `RelayChainContext` and `ParachainContext`
/// built by the given `BuildParachainContext` and arguments to the underlying polkadot node.
376
pub fn start_collator<P>(
377
378
379
380
	build_parachain_context: P,
	para_id: ParaId,
	key: Arc<CollatorPair>,
	config: Configuration,
381
382
383
384
) -> Result<
	(Pin<Box<dyn Future<Output = ()> + Send>>, sc_service::TaskManager),
	polkadot_service::Error
>
385
where
Seun Lanlege's avatar
Seun Lanlege committed
386
	P: 'static + BuildParachainContext,
387
388
389
	P::ParachainContext: Send + 'static,
	<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
{
390
391
	if matches!(config.role, Role::Light) {
		return Err(
392
			polkadot_service::Error::Other("light nodes are unsupported as collator".into())
393
394
395
396
		.into());
	}

	if config.chain_spec.is_kusama() {
397
		let (task_manager, client, handlers) = service::kusama_new_full(
398
399
400
401
402
403
404
			config,
			Some((key.public(), para_id)),
			None,
			false,
			6000,
			None,
		)?;
405
		let spawn_handle = task_manager.spawn_handle();
406
		let future = build_collator_service(
407
408
409
410
411
412
			spawn_handle,
			handlers,
			client,
			para_id,
			key,
			build_parachain_context
413
414
		)?;
		Ok((future.boxed(), task_manager))
415
	} else if config.chain_spec.is_westend() {
416
		let (task_manager, client, handlers) = service::westend_new_full(
417
418
419
420
421
422
423
			config,
			Some((key.public(), para_id)),
			None,
			false,
			6000,
			None,
		)?;
424
		let spawn_handle = task_manager.spawn_handle();
425
		let future = build_collator_service(
426
427
428
429
430
431
			spawn_handle,
			handlers,
			client,
			para_id,
			key,
			build_parachain_context
432
433
		)?;
		Ok((future.boxed(), task_manager))
434
	} else {
435
		let (task_manager, client, handles) = service::polkadot_new_full(
436
437
438
439
440
441
442
			config,
			Some((key.public(), para_id)),
			None,
			false,
			6000,
			None,
		)?;
443
		let spawn_handle = task_manager.spawn_handle();
444
		let future = build_collator_service(
445
446
447
448
449
			spawn_handle,
			handles,
			client,
			para_id,
			key,
450
451
452
			build_parachain_context
		)?;
		Ok((future.boxed(), task_manager))
453
454
455
	}
}

456
#[cfg(not(feature = "service-rewr"))]
457
fn compute_targets(para_id: ParaId, session_keys: &[ValidatorId], roster: DutyRoster) -> HashSet<ValidatorId> {
asynchronous rob's avatar
asynchronous rob committed
458
	use polkadot_primitives::v0::Chain;
459
460
461
462
463
464
465
466

	roster.validator_duty.iter().enumerate()
		.filter(|&(_, c)| c == &Chain::Parachain(para_id))
		.filter_map(|(i, _)| session_keys.get(i))
		.cloned()
		.collect()
}

Gav's avatar
Gav committed
467
468
469
470
#[cfg(test)]
mod tests {
	use super::*;

471
472
473
474
	#[derive(Clone)]
	struct DummyParachainContext;

	impl ParachainContext for DummyParachainContext {
475
		type ProduceCandidate = future::Ready<Option<(BlockData, HeadData)>>;
476

Ashley's avatar
Ashley committed
477
		fn produce_candidate(
478
			&mut self,
479
			_relay_parent: Hash,
480
			_global: GlobalValidationSchedule,
481
			_local_validation: LocalValidationData,
482
			_: Vec<DownwardMessage>,
483
		) -> Self::ProduceCandidate {
484
			// send messages right back.
485
			future::ready(Some((
486
487
				BlockData(vec![1, 2, 3, 4, 5,]),
				HeadData(vec![9, 9, 9]),
488
			)))
Gav's avatar
Gav committed
489
490
		}
	}
491
492
493
494
495
496

	struct BuildDummyParachainContext;

	impl BuildParachainContext for BuildDummyParachainContext {
		type ParachainContext = DummyParachainContext;

Seun Lanlege's avatar
Seun Lanlege committed
497
		fn build<C, SP, Extrinsic>(
498
			self,
Seun Lanlege's avatar
Seun Lanlege committed
499
			_: Arc<C>,
500
501
502
503
504
505
506
			_: SP,
			_: impl Network + Clone + 'static,
		) -> Result<Self::ParachainContext, ()> {
			Ok(DummyParachainContext)
		}
	}

507
	// Make sure that the future returned by `start_collator` implements `Send`.
508
509
510
511
	#[test]
	fn start_collator_is_send() {
		fn check_send<T: Send>(_: T) {}

512
		let cli = Cli::from_iter(&["-dev"]);
513
		let task_executor = |_, _| {};
514
		let config = cli.create_configuration(&cli.run.base, task_executor.into()).unwrap();
515

516
517
518
519
		check_send(start_collator(
			BuildDummyParachainContext,
			0.into(),
			Arc::new(CollatorPair::generate().0),
520
			config,
521
522
		));
	}
Gav's avatar
Gav committed
523
}