lib.rs 14.3 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
Gav's avatar
Gav committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Collation node logic.
Gav's avatar
Gav committed
18
19
20
21
22
23
24
25
26
27
28
29
30
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
//! (what we might call a witness or block data).
//!
//! One of collators' other roles is to route messages between chains.
//! Each parachain produces a list of "egress" posts of messages for each other
//! parachain on each block, for a total of N^2 lists all together.
//!
//! We will refer to the egress list at relay chain block X of parachain A with
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
31
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
Gav's avatar
Gav committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
//! up to the last point in history that messages were successfully routed
//! from that parachain, accounting for relay chain blocks where no candidate
//! from the collator's parachain was produced.
//!
//! In the case that all parachains route to each other and a candidate for the
//! collator's parachain was included in the last relay chain block, the collator
//! only has to gather egress posts from other parachains one block back in relay
//! chain history.
//!
//! This crate defines traits which provide context necessary for collation logic
//! to be performed, as the collation logic itself.

47
use std::collections::HashSet;
48
use std::fmt;
49
use std::sync::Arc;
50
use std::time::Duration;
51
use std::pin::Pin;
Gav's avatar
Gav committed
52

53
use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
54
use log::warn;
55
use sc_client::BlockchainEvents;
Gavin Wood's avatar
Gavin Wood committed
56
57
use sp_core::Pair;
use sp_runtime::traits::BlakeTwo256;
58
use polkadot_primitives::{
59
	BlockId, Hash, Block,
60
	parachain::{
Ashley's avatar
Ashley committed
61
		self, BlockData, DutyRoster, HeadData, Id as ParaId,
62
		PoVBlock, ValidatorId, CollatorPair, LocalValidationData
63
64
65
	}
};
use polkadot_cli::{
66
	ProvideRuntimeApi, AbstractService, ParachainHost, IsKusama,
67
	service::{self, Roles}
68
};
69
pub use polkadot_cli::{VersionInfo, load_spec, service::Configuration};
70
71
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
72
pub use sc_network::PeerId;
73
pub use service::RuntimeApiCollection;
74

75
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
Gav's avatar
Gav committed
76

77
/// An abstraction over the `Network` with useful functions for a `Collator`.
78
pub trait Network: Send + Sync {
79
80
81
82
83
	/// Create a `Stream` of checked statements for the given `relay_parent`.
	///
	/// The returned stream will not terminate, so it is required to make sure that the stream is
	/// dropped when it is not required anymore. Otherwise, it will stick around in memory
	/// infinitely.
84
	fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement>>>;
85
86
}

87
88
89
impl Network for polkadot_network::protocol::Service {
	fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement>>> {
		polkadot_network::protocol::Service::checked_statements(self, relay_parent)
90
91
92
	}
}

93
94
95
96
97
98
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;

/// Collation errors.
#[derive(Debug)]
Ashley's avatar
Ashley committed
99
pub enum Error {
100
	/// Error on the relay-chain side of things.
Ashley's avatar
Ashley committed
101
	Polkadot(String),
102
103
104
105
	/// Error on the collator side of things.
	Collator(InvalidHead),
}

Ashley's avatar
Ashley committed
106
impl fmt::Display for Error {
107
108
109
110
111
112
113
114
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
			Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
		}
	}
}

115
/// The Polkadot client type.
116
pub type PolkadotClient<B, E, R> = sc_client::Client<B, E, Block, R>;
117

118
119
120
121
122
123
/// Something that can build a `ParachainContext`.
pub trait BuildParachainContext {
	/// The parachain context produced by the `build` function.
	type ParachainContext: self::ParachainContext;

	/// Build the `ParachainContext`.
124
	fn build<B, E, R, SP, Extrinsic>(
125
		self,
126
		client: Arc<PolkadotClient<B, E, R>>,
127
		spawner: SP,
128
		network: impl Network + Clone + 'static,
129
130
	) -> Result<Self::ParachainContext, ()>
		where
131
			PolkadotClient<B, E, R>: ProvideRuntimeApi<Block>,
132
			<PolkadotClient<B, E, R> as ProvideRuntimeApi<Block>>::Api: RuntimeApiCollection<Extrinsic>,
133
134
			// Rust bug: https://github.com/rust-lang/rust/issues/24159
			<<PolkadotClient<B, E, R> as ProvideRuntimeApi<Block>>::Api as sp_api::ApiExt<Block>>::StateBackend:
Gavin Wood's avatar
Gavin Wood committed
135
				sp_api::StateBackend<BlakeTwo256>,
136
137
			Extrinsic: codec::Codec + Send + Sync + 'static,
			E: sc_client::CallExecutor<Block> + Clone + Send + Sync + 'static,
138
139
140
141
			SP: Spawn + Clone + Send + Sync + 'static,
			R: Send + Sync + 'static,
			B: sc_client_api::Backend<Block> + 'static,
			// Rust bug: https://github.com/rust-lang/rust/issues/24159
Gavin Wood's avatar
Gavin Wood committed
142
			B::State: sp_api::StateBackend<BlakeTwo256>;
143
144
}

Gav's avatar
Gav committed
145
146
147
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
148
149
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
Ashley's avatar
Ashley committed
150
	type ProduceCandidate: Future<Output = Result<(BlockData, HeadData), InvalidHead>>;
151

152
153
	/// Produce a candidate, given the relay parent hash, the latest ingress queue information
	/// and the last parachain head.
Ashley's avatar
Ashley committed
154
	fn produce_candidate(
155
		&mut self,
156
		relay_parent: Hash,
157
		status: LocalValidationData,
158
	) -> Self::ProduceCandidate;
Gav's avatar
Gav committed
159
160
}

161
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
Ashley's avatar
Ashley committed
162
pub async fn collate<P>(
163
	relay_parent: Hash,
164
	local_id: ParaId,
165
	local_validation_data: LocalValidationData,
166
	mut para_context: P,
167
	key: Arc<CollatorPair>,
168
)
Ashley's avatar
Ashley committed
169
	-> Result<parachain::Collation, Error>
Gav's avatar
Gav committed
170
	where
171
172
		P: ParachainContext,
		P::ProduceCandidate: Send,
Gav's avatar
Gav committed
173
{
Ashley's avatar
Ashley committed
174
	let (block_data, head_data) = para_context.produce_candidate(
175
		relay_parent,
176
		local_validation_data,
177
178
	).map_err(Error::Collator).await?;

179
180
181
182
183
184
185
186
187
188
189
	let pov_block = PoVBlock {
		block_data,
	};

	let pov_block_hash = pov_block.hash();
	let signature = key.sign(&parachain::collator_signature_payload(
		&relay_parent,
		&local_id,
		&pov_block_hash,
	));

190
	let info = parachain::CollationInfo {
191
		parachain_index: local_id,
192
		relay_parent,
193
194
		collator: key.public(),
		signature,
195
		head_data,
196
		pov_block_hash,
197
198
199
	};

	let collation = parachain::Collation {
200
		info,
201
		pov: pov_block,
202
203
	};

Ashley's avatar
Ashley committed
204
	Ok(collation)
205
206
}

207
fn build_collator_service<S, P, Extrinsic>(
208
	service: (S, polkadot_service::FullNodeHandles),
209
	para_id: ParaId,
210
	key: Arc<CollatorPair>,
211
	build_parachain_context: P,
Gavin Wood's avatar
Gavin Wood committed
212
) -> Result<S, polkadot_service::Error>
213
	where
214
		S: AbstractService<Block = service::Block>,
215
216
		sc_client::Client<S::Backend, S::CallExecutor, service::Block, S::RuntimeApi>: ProvideRuntimeApi<Block>,
		<sc_client::Client<S::Backend, S::CallExecutor, service::Block, S::RuntimeApi> as ProvideRuntimeApi<Block>>::Api:
217
			RuntimeApiCollection<
218
219
220
221
				Extrinsic,
				Error = sp_blockchain::Error,
				StateBackend = sc_client_api::StateBackendFor<S::Backend, Block>
			>,
222
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
223
		S::Backend: service::Backend<service::Block>,
224
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
225
		<S::Backend as service::Backend<service::Block>>::State:
Gavin Wood's avatar
Gavin Wood committed
226
			sp_api::StateBackend<sp_runtime::traits::HashFor<Block>>,
227
228
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
		S::CallExecutor: service::CallExecutor<service::Block>,
229
230
231
232
233
		// Rust bug: https://github.com/rust-lang/rust/issues/24159
		S::SelectChain: service::SelectChain<service::Block>,
		P: BuildParachainContext,
		P::ParachainContext: Send + 'static,
		<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
234
		Extrinsic: service::Codec + Send + Sync + 'static,
235
{
236
	let (service, handles) = service;
237
	let spawner = service.spawn_task_handle();
238

239
240
241
242
243
	let polkadot_network = match handles.polkadot_network {
		None => return Err(
			"Collator cannot run when Polkadot-specific networking has not been started".into()
		),
		Some(n) => n,
244
	};
245

246
	let client = service.client();
247
248
249
250

	let parachain_context = match build_parachain_context.build(
		client.clone(),
		spawner,
251
		polkadot_network.clone(),
252
253
254
	) {
		Ok(ctx) => ctx,
		Err(()) => {
Gavin Wood's avatar
Gavin Wood committed
255
			return Err("Could not build the parachain context!".into())
256
257
		}
	};
258

259
260
261
262
	let work = async move {
		let mut notification_stream = client.import_notification_stream();

		while let Some(notification) = notification_stream.next().await {
263
264
265
266
			macro_rules! try_fr {
				($e:expr) => {
					match $e {
						Ok(x) => x,
267
						Err(e) => return future::Either::Left(future::err(Error::Polkadot(
268
269
							format!("{:?}", e)
						))),
270
					}
271
				}
272
			}
273

274
275
276
			let relay_parent = notification.hash;
			let id = BlockId::hash(relay_parent);

277
			let network = polkadot_network.clone();
278
279
280
281
			let client = client.clone();
			let key = key.clone();
			let parachain_context = parachain_context.clone();

282
			let work = future::lazy(move |_| {
283
				let api = client.runtime_api();
284
285
				let local_validation = match try_fr!(api.local_validation_data(&id, para_id)) {
					Some(local_validation) => local_validation,
286
					None => return future::Either::Left(future::ok(())),
287
288
289
290
291
292
293
294
295
296
				};

				let validators = try_fr!(api.validators(&id));

				let targets = compute_targets(
					para_id,
					validators.as_slice(),
					try_fr!(api.duty_roster(&id)),
				);

297
				let collation_work = collate(
298
299
					relay_parent,
					para_id,
300
					local_validation,
301
302
					parachain_context,
					key,
Ashley's avatar
Ashley committed
303
				).map_ok(move |collation| {
304
					network.distribute_collation(targets, collation)
305
306
307
				});

				future::Either::Right(collation_work)
308
			});
309

310
			let deadlined = future::select(
311
				work.then(|f| f).boxed(),
312
313
314
315
316
317
318
319
320
				futures_timer::Delay::new(COLLATION_TIMEOUT)
			);

			let silenced = deadlined
				.map(|either| {
					if let future::Either::Right(_) = either {
						warn!("Collation failure: timeout");
					}
				});
321

322
			let future = silenced.map(drop);
323

324
			tokio::spawn(future);
325
326
		}
	}.boxed();
327

Gavin Wood's avatar
Gavin Wood committed
328
	service.spawn_essential_task("collation", work);
329

Gavin Wood's avatar
Gavin Wood committed
330
	Ok(service)
331
332
}

333
334
335
336
337
338
339
340
341
342
343
344
345
/// Async function that will run the collator node with the given `RelayChainContext` and `ParachainContext`
/// built by the given `BuildParachainContext` and arguments to the underlying polkadot node.
pub async fn start_collator<P>(
	build_parachain_context: P,
	para_id: ParaId,
	key: Arc<CollatorPair>,
	config: Configuration,
) -> Result<(), polkadot_service::Error>
where
	P: BuildParachainContext,
	P::ParachainContext: Send + 'static,
	<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
{
346
347
	let is_kusama = config.expect_chain_spec().is_kusama();
	match (is_kusama, config.roles) {
348
349
350
		(_, Roles::LIGHT) => return Err(
			polkadot_service::Error::Other("light nodes are unsupported as collator".into())
		).into(),
351
352
		(true, _) =>
			build_collator_service(
353
				service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000, None)?,
354
355
356
357
358
359
				para_id,
				key,
				build_parachain_context,
			)?.await,
		(false, _) =>
			build_collator_service(
360
				service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000, None)?,
361
362
363
364
365
366
367
				para_id,
				key,
				build_parachain_context,
			)?.await,
	}
}

368
fn compute_targets(para_id: ParaId, session_keys: &[ValidatorId], roster: DutyRoster) -> HashSet<ValidatorId> {
369
370
371
372
373
374
375
376
377
	use polkadot_primitives::parachain::Chain;

	roster.validator_duty.iter().enumerate()
		.filter(|&(_, c)| c == &Chain::Parachain(para_id))
		.filter_map(|(i, _)| session_keys.get(i))
		.cloned()
		.collect()
}

378
/// Run a collator node with the given `RelayChainContext` and `ParachainContext`
379
/// built by the given `BuildParachainContext` and arguments to the underlying polkadot node.
380
381
///
/// This function blocks until done.
Gavin Wood's avatar
Gavin Wood committed
382
pub fn run_collator<P>(
383
	build_parachain_context: P,
384
	para_id: ParaId,
385
	key: Arc<CollatorPair>,
Gavin Wood's avatar
Gavin Wood committed
386
	config: Configuration,
387
) -> polkadot_cli::Result<()> where
388
	P: BuildParachainContext,
389
	P::ParachainContext: Send + 'static,
390
	<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
391
{
Gavin Wood's avatar
Gavin Wood committed
392
	match (config.expect_chain_spec().is_kusama(), config.roles) {
393
394
395
		(_, Roles::LIGHT) => return Err(
			polkadot_cli::Error::Input("light nodes are unsupported as collator".into())
		).into(),
396
		(true, _) =>
Gavin Wood's avatar
Gavin Wood committed
397
			sc_cli::run_service_until_exit(config, |config| {
398
				build_collator_service(
399
					service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000, None)?,
Gavin Wood's avatar
Gavin Wood committed
400
401
402
403
404
					para_id,
					key,
					build_parachain_context,
				)
			}),
405
		(false, _) =>
Gavin Wood's avatar
Gavin Wood committed
406
			sc_cli::run_service_until_exit(config, |config| {
407
				build_collator_service(
408
					service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000, None)?,
Gavin Wood's avatar
Gavin Wood committed
409
410
411
412
413
					para_id,
					key,
					build_parachain_context,
				)
			}),
414
	}
415
416
}

Gav's avatar
Gav committed
417
418
419
420
#[cfg(test)]
mod tests {
	use super::*;

421
422
423
424
	#[derive(Clone)]
	struct DummyParachainContext;

	impl ParachainContext for DummyParachainContext {
Ashley's avatar
Ashley committed
425
		type ProduceCandidate = future::Ready<Result<(BlockData, HeadData), InvalidHead>>;
426

Ashley's avatar
Ashley committed
427
		fn produce_candidate(
428
			&mut self,
429
			_relay_parent: Hash,
430
			_local_validation: LocalValidationData,
431
		) -> Self::ProduceCandidate {
432
			// send messages right back.
433
			future::ok((
434
435
436
				BlockData(vec![1, 2, 3, 4, 5,]),
				HeadData(vec![9, 9, 9]),
			))
Gav's avatar
Gav committed
437
438
		}
	}
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466

	struct BuildDummyParachainContext;

	impl BuildParachainContext for BuildDummyParachainContext {
		type ParachainContext = DummyParachainContext;

		fn build<B, E, R, SP, Extrinsic>(
			self,
			_: Arc<PolkadotClient<B, E, R>>,
			_: SP,
			_: impl Network + Clone + 'static,
		) -> Result<Self::ParachainContext, ()> {
			Ok(DummyParachainContext)
		}
	}

	// Make sure that the future returned by `start_collator` implementes `Send`.
	#[test]
	fn start_collator_is_send() {
		fn check_send<T: Send>(_: T) {}

		check_send(start_collator(
			BuildDummyParachainContext,
			0.into(),
			Arc::new(CollatorPair::generate().0),
			Default::default(),
		));
	}
Gav's avatar
Gav committed
467
}