lib.rs 17.7 KB
Newer Older
Gav's avatar
Gav committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Collation node logic.
Gav's avatar
Gav committed
18
19
20
21
22
23
24
25
26
27
28
29
30
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
//! (what we might call a witness or block data).
//!
//! One of collators' other roles is to route messages between chains.
//! Each parachain produces a list of "egress" posts of messages for each other
//! parachain on each block, for a total of N^2 lists all together.
//!
//! We will refer to the egress list at relay chain block X of parachain A with
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
31
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
Gav's avatar
Gav committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
//! up to the last point in history that messages were successfully routed
//! from that parachain, accounting for relay chain blocks where no candidate
//! from the collator's parachain was produced.
//!
//! In the case that all parachains route to each other and a candidate for the
//! collator's parachain was included in the last relay chain block, the collator
//! only has to gather egress posts from other parachains one block back in relay
//! chain history.
//!
//! This crate defines traits which provide context necessary for collation logic
//! to be performed, as the collation logic itself.

47
use std::collections::HashSet;
48
use std::fmt;
49
use std::sync::Arc;
50
use std::time::Duration;
51
use std::pin::Pin;
Gav's avatar
Gav committed
52

53
use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
54
use log::{warn, error};
55
use client::BlockchainEvents;
56
use primitives::{Pair, Blake2Hasher};
57
use polkadot_primitives::{
58
	BlockId, Hash, Block,
59
	parachain::{
60
61
		self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId,
		OutgoingMessages, PoVBlock, Status as ParachainStatus, ValidatorId, CollatorPair,
62
63
64
	}
};
use polkadot_cli::{
65
	Worker, IntoExit, ProvideRuntimeApi, AbstractService, CustomConfiguration, ParachainHost,
66
};
67
use polkadot_network::validation::{LeafWorkParams, ValidationNetwork};
68
use polkadot_network::PolkadotProtocol;
69
use polkadot_runtime::RuntimeApi;
70

71
pub use polkadot_cli::VersionInfo;
72
pub use polkadot_network::validation::Incoming;
73
74
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
75
pub use sc_network::PeerId;
76

77
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
Gav's avatar
Gav committed
78

79
/// An abstraction over the `Network` with useful functions for a `Collator`.
80
pub trait Network: Send + Sync {
81
82
	/// Convert the given `CollatorId` to a `PeerId`.
	fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
83
		Box<dyn Future<Output=Option<PeerId>> + Send>;
84
85
86
87
88
89

	/// Create a `Stream` of checked statements for the given `relay_parent`.
	///
	/// The returned stream will not terminate, so it is required to make sure that the stream is
	/// dropped when it is not required anymore. Otherwise, it will stick around in memory
	/// infinitely.
90
	fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>>;
91
92
}

93
impl<P, E, SP> Network for ValidationNetwork<P, E, SP> where
94
95
	P: 'static + Send + Sync,
	E: 'static + Send + Sync,
96
	SP: 'static + Spawn + Clone + Send + Sync,
97
98
{
	fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
99
		Box<dyn Future<Output=Option<PeerId>> + Send>
100
	{
101
		Box::new(Self::collator_id_to_peer_id(self, collator_id))
102
103
	}

104
	fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>> {
105
		Box::new(Self::checked_statements(self, relay_parent))
106
107
108
	}
}

109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;

/// Collation errors.
#[derive(Debug)]
pub enum Error<R> {
	/// Error on the relay-chain side of things.
	Polkadot(R),
	/// Error on the collator side of things.
	Collator(InvalidHead),
}

impl<R: fmt::Display> fmt::Display for Error<R> {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
			Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
		}
	}
}

131
132
133
/// The Polkadot client type.
pub type PolkadotClient<B, E> = client::Client<B, E, Block, RuntimeApi>;

134
135
136
137
138
139
/// Something that can build a `ParachainContext`.
pub trait BuildParachainContext {
	/// The parachain context produced by the `build` function.
	type ParachainContext: self::ParachainContext;

	/// Build the `ParachainContext`.
140
	fn build<B, E, SP>(
141
142
		self,
		client: Arc<PolkadotClient<B, E>>,
143
		spawner: SP,
144
145
146
		network: Arc<dyn Network>,
	) -> Result<Self::ParachainContext, ()>
		where
147
			B: client_api::backend::Backend<Block, Blake2Hasher> + 'static,
148
149
			E: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static,
			SP: Spawn + Clone + Send + Sync + 'static;
150
151
}

Gav's avatar
Gav committed
152
153
154
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
155
156
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
157
	type ProduceCandidate: Future<Output = Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>>;
158

159
160
	/// Produce a candidate, given the relay parent hash, the latest ingress queue information
	/// and the last parachain head.
Gav's avatar
Gav committed
161
	fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
162
		&mut self,
163
		relay_parent: Hash,
164
		status: ParachainStatus,
Gav's avatar
Gav committed
165
		ingress: I,
166
	) -> Self::ProduceCandidate;
Gav's avatar
Gav committed
167
168
169
170
171
172
}

/// Relay chain context needed to collate.
/// This encapsulates a network and local database which may store
/// some of the input.
pub trait RelayChainContext {
173
	type Error: std::fmt::Debug;
Gav's avatar
Gav committed
174
175
176

	/// Future that resolves to the un-routed egress queues of a parachain.
	/// The first item is the oldest.
177
	type FutureEgress: Future<Output = Result<ConsolidatedIngress, Self::Error>>;
Gav's avatar
Gav committed
178
179

	/// Get un-routed egress queues from a parachain to the local parachain.
180
	fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress;
Gav's avatar
Gav committed
181
182
}

183
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
184
pub async fn collate<R, P>(
185
	relay_parent: Hash,
186
	local_id: ParaId,
187
	parachain_status: ParachainStatus,
188
	relay_context: R,
189
	mut para_context: P,
190
	key: Arc<CollatorPair>,
191
)
192
	-> Result<(parachain::Collation, OutgoingMessages), Error<R::Error>>
Gav's avatar
Gav committed
193
	where
194
		R: RelayChainContext,
195
196
		P: ParachainContext,
		P::ProduceCandidate: Send,
Gav's avatar
Gav committed
197
{
198
199
200
201
202
203
204
205
206
207
208
209
210
	let ingress = relay_context.unrouted_egress(local_id).await.map_err(Error::Polkadot)?;

	let (block_data, head_data, mut outgoing) = para_context.produce_candidate(
		relay_parent,
		parachain_status,
		ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
	).map_err(Error::Collator).await?;

	let block_data_hash = block_data.hash();
	let signature = key.sign(block_data_hash.as_ref());
	let egress_queue_roots =
		polkadot_validation::egress_roots(&mut outgoing.outgoing_messages);

211
	let info = parachain::CollationInfo {
212
213
214
215
		parachain_index: local_id,
		collator: key.public(),
		signature,
		egress_queue_roots,
216
		head_data,
217
218
219
220
221
		block_data_hash,
		upward_messages: Vec::new(),
	};

	let collation = parachain::Collation {
222
		info,
223
224
225
226
227
228
229
		pov: PoVBlock {
			block_data,
			ingress,
		},
	};

	Ok((collation, outgoing))
Gav's avatar
Gav committed
230
231
}

232
/// Polkadot-api context.
233
struct ApiContext<P, E, SP> {
234
	network: Arc<ValidationNetwork<P, E, SP>>,
235
	parent_hash: Hash,
236
	validators: Vec<ValidatorId>,
237
}
238

239
impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E, SP> where
240
241
	P: ProvideRuntimeApi + Send + Sync,
	P::Api: ParachainHost<Block>,
Gavin Wood's avatar
Gavin Wood committed
242
	E: futures::Future<Output=()> + Clone + Send + Sync + 'static,
243
	SP: Spawn + Clone + Send + Sync
244
245
{
	type Error = String;
246
	type FutureEgress = Pin<Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Send>>;
247

248
	fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
		let network = self.network.clone();
		let parent_hash = self.parent_hash;
		let authorities = self.validators.clone();

		async move {
			// TODO: https://github.com/paritytech/polkadot/issues/253
			//
			// Fetch ingress and accumulate all unrounted egress
			let _session = network.instantiate_leaf_work(LeafWorkParams {
				local_session_key: None,
				parent_hash,
				authorities,
			})
				.map_err(|e| format!("unable to instantiate validation session: {:?}", e));

			Ok(ConsolidatedIngress(Vec::new()))
		}.boxed()
266
267
268
269
	}
}

struct CollationNode<P, E> {
270
	build_parachain_context: P,
271
272
	exit: E,
	para_id: ParaId,
273
	key: Arc<CollatorPair>,
274
275
}

276
impl<P, E> IntoExit for CollationNode<P, E> where
Gavin Wood's avatar
Gavin Wood committed
277
	E: futures::Future<Output=()> + Unpin + Send + 'static
278
{
Gavin Wood's avatar
Gavin Wood committed
279
	type Exit = E;
280
	fn into_exit(self) -> Self::Exit {
Gavin Wood's avatar
Gavin Wood committed
281
		self.exit
282
283
284
	}
}

285
impl<P, E> Worker for CollationNode<P, E> where
286
287
	P: BuildParachainContext + Send + 'static,
	P::ParachainContext: Send + 'static,
288
	<P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static,
Gavin Wood's avatar
Gavin Wood committed
289
	E: futures::Future<Output=()> + Clone + Unpin + Send + Sync + 'static,
290
{
291
	type Work = Box<dyn Future<Output=()> + Unpin + Send>;
292

293
294
295
	fn configuration(&self) -> CustomConfiguration {
		let mut config = CustomConfiguration::default();
		config.collating_for = Some((
Gav Wood's avatar
Gav Wood committed
296
			self.key.public(),
297
			self.para_id,
298
299
300
301
		));
		config
	}

302
	fn work<S, SC, B, CE, SP>(self, service: &S, spawner: SP) -> Self::Work
303
304
305
306
307
308
309
310
311
312
	where
		S: AbstractService<
			Block = Block,
			RuntimeApi = RuntimeApi,
			Backend = B,
			SelectChain = SC,
			NetworkSpecialization = PolkadotProtocol,
			CallExecutor = CE,
		>,
		SC: polkadot_service::SelectChain<Block> + 'static,
313
		B: client_api::backend::Backend<Block, Blake2Hasher> + 'static,
314
315
		CE: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static,
		SP: Spawn + Clone + Send + Sync + 'static,
316
	{
317
		let CollationNode { build_parachain_context, exit, para_id, key } = self;
318
		let client = service.client();
319
		let network = service.network();
320
		let known_oracle = client.clone();
thiolliere's avatar
thiolliere committed
321
322
323
		let select_chain = if let Some(select_chain) = service.select_chain() {
			select_chain
		} else {
324
			error!("The node cannot work because it can't select chain.");
325
			return Box::new(future::ready(()));
thiolliere's avatar
thiolliere committed
326
		};
327

328
		let is_known = move |block_hash: &Hash| {
329
			use consensus_common::BlockStatus;
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
			use polkadot_network::gossip::Known;

			match known_oracle.block_status(&BlockId::hash(*block_hash)) {
				Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
				Ok(BlockStatus::KnownBad) => Some(Known::Bad),
				Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
					match select_chain.leaves() {
						Err(_) => None,
						Ok(leaves) => if leaves.contains(block_hash) {
							Some(Known::Leaf)
						} else {
							Some(Known::Old)
						},
					}
			}
		};

347
		let message_validator = polkadot_network::gossip::register_validator(
348
			network.clone(),
349
			(is_known, client.clone()),
350
			&spawner
351
352
		);

353
		let validation_network = Arc::new(ValidationNetwork::new(
354
			message_validator,
355
			exit.clone(),
356
			client.clone(),
357
			spawner.clone(),
358
		));
359

360
361
		let parachain_context = match build_parachain_context.build(
			client.clone(),
362
			spawner,
363
364
365
366
367
			validation_network.clone(),
		) {
			Ok(ctx) => ctx,
			Err(()) => {
				error!("Could not build the parachain context!");
368
				return Box::new(future::ready(()))
369
370
371
			}
		};

372
		let inner_exit = exit.clone();
373
		let work = client.import_notification_stream()
374
375
376
377
378
			.for_each(move |notification| {
				macro_rules! try_fr {
					($e:expr) => {
						match $e {
							Ok(x) => x,
379
							Err(e) => return future::Either::Left(future::err(Error::Polkadot(
380
381
								format!("{:?}", e)
							))),
382
						}
383
					}
384
385
386
387
388
389
				}

				let relay_parent = notification.hash;
				let id = BlockId::hash(relay_parent);

				let network = network.clone();
390
				let client = client.clone();
391
392
				let key = key.clone();
				let parachain_context = parachain_context.clone();
393
				let validation_network = validation_network.clone();
394
				let inner_exit_2 = inner_exit.clone();
395

396
				let work = future::lazy(move |_| {
397
					let api = client.runtime_api();
398
399
					let status = match try_fr!(api.parachain_status(&id, para_id)) {
						Some(status) => status,
400
						None => return future::Either::Left(future::ok(())),
401
402
					};

403
					let validators = try_fr!(api.validators(&id));
404

405
406
					let targets = compute_targets(
						para_id,
407
						validators.as_slice(),
408
409
410
						try_fr!(api.duty_roster(&id)),
					);

411
412
413
					let context = ApiContext {
						network: validation_network,
						parent_hash: relay_parent,
414
						validators,
415
416
					};

417
					let collation_work = collate(
418
						relay_parent,
419
						para_id,
420
						status,
421
						context,
422
423
						parachain_context,
						key,
424
					).map_ok(move |(collation, outgoing)| {
425
426
427
428
429
430
431
432
433
						network.with_spec(move |spec, ctx| {
							let res = spec.add_local_collation(
								ctx,
								relay_parent,
								targets,
								collation,
								outgoing,
							);

434
							let exit = inner_exit_2.clone();
435
							tokio::spawn(future::select(res.boxed(), exit).map(drop));
436
						})
437
438
					});

439
					future::Either::Right(collation_work)
440
				});
441

442
443
444
445
446
447
448
449
450
451
452
453
454
455
				let deadlined = future::select(
					work,
					futures_timer::Delay::new(COLLATION_TIMEOUT)
				);

				let silenced = deadlined
					.map(|either| {
						if let future::Either::Right(_) = either {
							warn!("Collation failure: timeout");
						}
					});

				let future = future::select(
					silenced,
Gavin Wood's avatar
Gavin Wood committed
456
					inner_exit.clone()
457
				).map(drop);
458
459
460

				tokio::spawn(future);
				future::ready(())
461
462
			});

Gavin Wood's avatar
Gavin Wood committed
463
464
		let work_and_exit = future::select(work, exit)
			.map(|_| ());
465
466

		Box::new(work_and_exit)
467
468
469
	}
}

470
fn compute_targets(para_id: ParaId, session_keys: &[ValidatorId], roster: DutyRoster) -> HashSet<ValidatorId> {
471
472
473
474
475
476
477
478
479
	use polkadot_primitives::parachain::Chain;

	roster.validator_duty.iter().enumerate()
		.filter(|&(_, c)| c == &Chain::Parachain(para_id))
		.filter_map(|(i, _)| session_keys.get(i))
		.cloned()
		.collect()
}

480
481
/// Run a collator node with the given `RelayChainContext` and `ParachainContext`
/// build by the given `BuildParachainContext` and arguments to the underlying polkadot node.
482
483
484
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
485
pub fn run_collator<P, E>(
486
	build_parachain_context: P,
487
488
	para_id: ParaId,
	exit: E,
489
	key: Arc<CollatorPair>,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
490
	version: VersionInfo,
491
) -> polkadot_cli::error::Result<()> where
492
493
	P: BuildParachainContext + Send + 'static,
	P::ParachainContext: Send + 'static,
494
	<P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static,
Gavin Wood's avatar
Gavin Wood committed
495
	E: futures::Future<Output = ()> + Unpin + Send + Clone + Sync + 'static,
496
{
497
	let node_logic = CollationNode { build_parachain_context, exit, para_id, key };
498
	polkadot_cli::run(node_logic, version)
499
500
}

Gav's avatar
Gav committed
501
502
#[cfg(test)]
mod tests {
503
	use std::collections::HashMap;
504
	use polkadot_primitives::parachain::{TargetedMessage, FeeSchedule};
505
	use keyring::Sr25519Keyring;
Gav's avatar
Gav committed
506
507
	use super::*;

508
509
510
	#[derive(Default, Clone)]
	struct DummyRelayChainContext {
		ingress: HashMap<ParaId, ConsolidatedIngress>
Gav's avatar
Gav committed
511
512
	}

513
	impl RelayChainContext for DummyRelayChainContext {
Gav's avatar
Gav committed
514
		type Error = ();
515
		type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress,()>> + Unpin>;
Gav's avatar
Gav committed
516

517
518
519
		fn unrouted_egress(&self, para_id: ParaId) -> Self::FutureEgress {
			match self.ingress.get(&para_id) {
				Some(ingress) => Box::new(future::ok(ingress.clone())),
520
				None => Box::new(future::pending()),
521
			}
Gav's avatar
Gav committed
522
		}
523
	}
Gav's avatar
Gav committed
524

525
526
527
528
	#[derive(Clone)]
	struct DummyParachainContext;

	impl ParachainContext for DummyParachainContext {
529
		type ProduceCandidate = future::Ready<Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>>;
530

531
		fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
532
			&mut self,
533
			_relay_parent: Hash,
534
			_status: ParachainStatus,
535
			ingress: I,
536
		) -> Self::ProduceCandidate {
537
			// send messages right back.
538
			future::ok((
539
540
				BlockData(vec![1, 2, 3, 4, 5,]),
				HeadData(vec![9, 9, 9]),
541
542
				OutgoingMessages {
					outgoing_messages: ingress.into_iter().map(|(id, msg)| TargetedMessage {
543
544
545
546
547
						target: id,
						data: msg.0,
					}).collect(),
				}
			))
Gav's avatar
Gav committed
548
549
550
		}
	}

551
	#[test]
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
	fn collates_correct_queue_roots() {
		let mut context = DummyRelayChainContext::default();

		let id = ParaId::from(100);

		let a = ParaId::from(123);
		let b = ParaId::from(456);

		let messages_from_a = vec![
			Message(vec![1, 1, 1]),
			Message(b"helloworld".to_vec()),
		];
		let messages_from_b = vec![
			Message(b"dogglesworth".to_vec()),
			Message(b"buy_1_chili_con_carne_here_is_my_cash".to_vec()),
		];

		let root_a = ::polkadot_validation::message_queue_root(
			messages_from_a.iter().map(|msg| &msg.0)
		);

		let root_b = ::polkadot_validation::message_queue_root(
			messages_from_b.iter().map(|msg| &msg.0)
		);

		context.ingress.insert(id, ConsolidatedIngress(vec![
			(b, messages_from_b),
			(a, messages_from_a),
		]));

582
		let future = collate(
583
			Default::default(),
584
			id,
585
586
587
588
589
590
591
592
			ParachainStatus {
				head_data: HeadData(vec![5]),
				balance: 10,
				fee_schedule: FeeSchedule {
					base: 0,
					per_byte: 1,
				},
			},
593
594
			context.clone(),
			DummyParachainContext,
595
			Arc::new(Sr25519Keyring::Alice.pair().into()),
596
597
598
		);

		let collation = futures::executor::block_on(future).unwrap().0;
599
600

		// ascending order by root.
601
		assert_eq!(collation.info.egress_queue_roots, vec![(a, root_a), (b, root_b)]);
Gav's avatar
Gav committed
602
603
	}
}
604