lib.rs 14.2 KB
Newer Older
Gav's avatar
Gav committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Collation node logic.
Gav's avatar
Gav committed
18
19
20
21
22
23
24
25
26
27
28
29
30
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
//! (what we might call a witness or block data).
//!
//! One of collators' other roles is to route messages between chains.
//! Each parachain produces a list of "egress" posts of messages for each other
//! parachain on each block, for a total of N^2 lists all together.
//!
//! We will refer to the egress list at relay chain block X of parachain A with
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
31
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
Gav's avatar
Gav committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
//! up to the last point in history that messages were successfully routed
//! from that parachain, accounting for relay chain blocks where no candidate
//! from the collator's parachain was produced.
//!
//! In the case that all parachains route to each other and a candidate for the
//! collator's parachain was included in the last relay chain block, the collator
//! only has to gather egress posts from other parachains one block back in relay
//! chain history.
//!
//! This crate defines traits which provide context necessary for collation logic
//! to be performed, as the collation logic itself.

extern crate futures;
48
extern crate substrate_client as client;
49
extern crate parity_codec as codec;
Gav's avatar
Gav committed
50
extern crate substrate_primitives as primitives;
51
extern crate tokio;
52
53

extern crate polkadot_cli;
Gav Wood's avatar
Gav Wood committed
54
extern crate polkadot_runtime;
Gav's avatar
Gav committed
55
extern crate polkadot_primitives;
56
57
extern crate polkadot_network;
extern crate polkadot_validation;
Gav's avatar
Gav committed
58

59
60
61
#[macro_use]
extern crate log;

62
63
64
65
#[cfg(test)]
extern crate substrate_keyring as keyring;

use std::collections::HashSet;
66
use std::fmt;
67
use std::sync::Arc;
68
use std::time::Duration;
Gav's avatar
Gav committed
69

70
use futures::{future, Stream, Future, IntoFuture};
71
use client::BlockchainEvents;
Gav Wood's avatar
Gav Wood committed
72
use primitives::{ed25519, Pair};
73
74
use polkadot_primitives::{BlockId, SessionKey, Hash, Block};
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic};
75
use polkadot_cli::{PolkadotService, CustomConfiguration, CoreApi, ParachainHost};
76
77
78
use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi, TaskExecutor};
use polkadot_network::validation::{ValidationNetwork, SessionParams};
use polkadot_network::NetworkService;
79
use tokio::timer::Timeout;
80

81
pub use polkadot_cli::VersionInfo;
82
pub use polkadot_network::validation::Incoming;
83

84
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
Gav's avatar
Gav committed
85

86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;

/// Collation errors.
#[derive(Debug)]
pub enum Error<R> {
	/// Error on the relay-chain side of things.
	Polkadot(R),
	/// Error on the collator side of things.
	Collator(InvalidHead),
}

impl<R: fmt::Display> fmt::Display for Error<R> {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
			Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
		}
	}
}

Gav's avatar
Gav committed
108
109
110
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
111
112
113
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
	/// Produce a candidate, given the latest ingress queue information and the last parachain head.
Gav's avatar
Gav committed
114
115
	fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
		&self,
116
		last_head: HeadData,
Gav's avatar
Gav committed
117
		ingress: I,
118
	) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead>;
Gav's avatar
Gav committed
119
120
121
122
123
124
}

/// Relay chain context needed to collate.
/// This encapsulates a network and local database which may store
/// some of the input.
pub trait RelayChainContext {
125
	type Error: ::std::fmt::Debug;
Gav's avatar
Gav committed
126
127
128

	/// Future that resolves to the un-routed egress queues of a parachain.
	/// The first item is the oldest.
129
	type FutureEgress: IntoFuture<Item=ConsolidatedIngress, Error=Self::Error>;
Gav's avatar
Gav committed
130
131
132
133
134

	/// Get un-routed egress queues from a parachain to the local parachain.
	fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress;
}

135
136
137
138
139
140
141
142
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
pub fn collate<'a, R, P>(
	local_id: ParaId,
	last_head: HeadData,
	relay_context: R,
	para_context: P,
	key: Arc<ed25519::Pair>,
)
143
	-> impl Future<Item=parachain::Collation, Error=Error<R::Error>> + 'a
Gav's avatar
Gav committed
144
	where
145
		R: RelayChainContext,
146
		R::Error: 'a,
Gav's avatar
Gav committed
147
148
149
		R::FutureEgress: 'a,
		P: ParachainContext + 'a,
{
150
151
152
	let ingress = relay_context.unrouted_egress(local_id).into_future().map_err(Error::Polkadot);
	ingress.and_then(move |ConsolidatedIngress(ingress)| {
		let (block_data, head_data, mut extrinsic) = para_context.produce_candidate(
153
			last_head,
154
			ingress.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
155
		).map_err(Error::Collator)?;
Gav's avatar
Gav committed
156

157
		let block_data_hash = block_data.hash();
158
		let signature = key.sign(block_data_hash.as_ref()).into();
159
160
		let egress_queue_roots =
			::polkadot_validation::egress_roots(&mut extrinsic.outgoing_messages);
161
162

		let receipt = parachain::CandidateReceipt {
Gav's avatar
Gav committed
163
			parachain_index: local_id,
Gav Wood's avatar
Gav Wood committed
164
			collator: key.public(),
165
166
167
			signature,
			head_data,
			balance_uploads: Vec::new(),
168
			egress_queue_roots,
169
			fees: 0,
170
			block_data_hash,
171
172
		};

173
		// not necessary to send extrinsic because it is recomputed from execution.
174
		Ok(parachain::Collation {
175
176
			receipt,
			block_data,
177
		})
178
	})
Gav's avatar
Gav committed
179
180
}

181
/// Polkadot-api context.
182
183
184
185
186
struct ApiContext<P, E> {
	network: ValidationNetwork<P, E, NetworkService, TaskExecutor>,
	parent_hash: Hash,
	authorities: Vec<SessionKey>,
}
187

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
	P: ProvideRuntimeApi + Send + Sync,
	P::Api: ParachainHost<Block>,
	E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
{
	type Error = String;
	type FutureEgress = Box<Future<Item=ConsolidatedIngress, Error=String> + Send>;

	fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress {
		let session = self.network.instantiate_session(SessionParams {
			local_session_key: None,
			parent_hash: self.parent_hash,
			authorities: self.authorities.clone(),
		}).map_err(|e| format!("unable to instantiate validation session: {:?}", e));

		let fetch_incoming = session
			.and_then(move |session| session.fetch_incoming(id).map_err(|e|
				format!("unable to fetch incoming data: {:?}", e)
			))
			.map(ConsolidatedIngress);

		Box::new(fetch_incoming)
210
211
212
213
214
215
216
217
218
219
	}
}

struct CollationNode<P, E> {
	parachain_context: P,
	exit: E,
	para_id: ParaId,
	key: Arc<ed25519::Pair>,
}

220
221
222
223
224
225
226
227
228
229
impl<P, E> IntoExit for CollationNode<P, E> where
	P: ParachainContext + Send + 'static,
	E: Future<Item=(),Error=()> + Send + 'static
{
	type Exit = E;
	fn into_exit(self) -> Self::Exit {
		self.exit
	}
}

230
impl<P, E> Worker for CollationNode<P, E> where
231
	P: ParachainContext + Send + 'static,
232
	E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static
233
{
234
	type Work = Box<Future<Item=(),Error=()> + Send>;
235

236
237
238
	fn configuration(&self) -> CustomConfiguration {
		let mut config = CustomConfiguration::default();
		config.collating_for = Some((
Gav Wood's avatar
Gav Wood committed
239
			self.key.public(),
240
241
242
243
244
			self.para_id.clone(),
		));
		config
	}

245
	fn work<S>(self, service: &S, task_executor: TaskExecutor) -> Self::Work
246
247
		where S: PolkadotService,
	{
248
249
		let CollationNode { parachain_context, exit, para_id, key } = self;
		let client = service.client();
250
		let network = service.network();
251
252
253
254
255
256
257
258
259
260
261
		let known_oracle = client.clone();

		let message_validator = polkadot_network::gossip::register_validator(
			&*network,
			move |block_hash: &Hash| {
				use client::{BlockStatus, ChainHead};
				use polkadot_network::gossip::Known;

				match known_oracle.block_status(&BlockId::hash(*block_hash)) {
					Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
					Ok(BlockStatus::KnownBad) => Some(Known::Bad),
262
263
264
265
266
267
268
269
270
					Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
						match known_oracle.leaves() {
							Err(_) => None,
							Ok(leaves) => if leaves.contains(block_hash) {
								Some(Known::Leaf)
							} else {
								Some(Known::Old)
							},
						}
271
272
273
274
275
276
277
278
279
280
281
				}
			},
		);

		let validation_network = ValidationNetwork::new(
			network.clone(),
			exit.clone(),
			message_validator,
			client.clone(),
			task_executor,
		);
282

283
		let inner_exit = exit.clone();
284
		let work = client.import_notification_stream()
285
286
287
288
289
			.for_each(move |notification| {
				macro_rules! try_fr {
					($e:expr) => {
						match $e {
							Ok(x) => x,
290
291
292
							Err(e) => return future::Either::A(future::err(Error::Polkadot(
								format!("{:?}", e)
							))),
293
						}
294
					}
295
296
297
298
299
300
				}

				let relay_parent = notification.hash;
				let id = BlockId::hash(relay_parent);

				let network = network.clone();
301
				let client = client.clone();
302
303
				let key = key.clone();
				let parachain_context = parachain_context.clone();
304
				let validation_network = validation_network.clone();
305
306

				let work = future::lazy(move || {
307
					let api = client.runtime_api();
Gav Wood's avatar
Gav Wood committed
308
					let last_head = match try_fr!(api.parachain_head(&id, para_id)) {
309
310
311
312
						Some(last_head) => last_head,
						None => return future::Either::A(future::ok(())),
					};

313
314
					let authorities = try_fr!(api.authorities(&id));

315
316
					let targets = compute_targets(
						para_id,
317
						authorities.as_slice(),
318
319
320
						try_fr!(api.duty_roster(&id)),
					);

321
322
323
324
325
326
					let context = ApiContext {
						network: validation_network,
						parent_hash: relay_parent,
						authorities,
					};

327
328
329
					let collation_work = collate(
						para_id,
						HeadData(last_head),
330
						context,
331
332
333
						parachain_context,
						key,
					).map(move |collation| {
334
						network.with_spec(move |spec, ctx| spec.add_local_collation(
335
336
337
338
339
340
341
342
343
							ctx,
							relay_parent,
							targets,
							collation,
						));
					});

					future::Either::B(collation_work)
				});
344
				let deadlined = Timeout::new(work, COLLATION_TIMEOUT);
345
346
				let silenced = deadlined.then(|res| match res {
					Ok(()) => Ok(()),
347
348
					Err(_) => {
						warn!("Collation failure: timeout");
349
						Ok(())
350
					}
351
352
				});

353
				tokio::spawn(silenced.select(inner_exit.clone()).then(|_| Ok(())));
354
355
356
357
358
359
360
361
				Ok(())
			});

		let work_and_exit = work.select(exit).then(|_| Ok(()));
		Box::new(work_and_exit) as Box<_>
	}
}

362
363
364
365
366
367
368
369
370
371
fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet<SessionKey> {
	use polkadot_primitives::parachain::Chain;

	roster.validator_duty.iter().enumerate()
		.filter(|&(_, c)| c == &Chain::Parachain(para_id))
		.filter_map(|(i, _)| session_keys.get(i))
		.cloned()
		.collect()
}

372
373
374
375
376
/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
/// arguments to the underlying polkadot node.
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
377
pub fn run_collator<P, E, I, ArgT>(
378
379
380
381
	parachain_context: P,
	para_id: ParaId,
	exit: E,
	key: Arc<ed25519::Pair>,
382
	args: I,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
383
	version: VersionInfo,
384
) -> polkadot_cli::error::Result<()> where
385
	P: ParachainContext + Send + 'static,
386
	E: IntoFuture<Item=(),Error=()>,
387
	E::Future: Send + Clone + Sync + 'static,
388
389
	I: IntoIterator<Item=ArgT>,
	ArgT: Into<std::ffi::OsString> + Clone,
390
391
{
	let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id, key };
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
392
	polkadot_cli::run(args, node_logic, version)
393
394
}

Gav's avatar
Gav committed
395
396
#[cfg(test)]
mod tests {
397
398
399
	use std::collections::HashMap;
	use polkadot_primitives::parachain::OutgoingMessage;
	use keyring::AuthorityKeyring;
Gav's avatar
Gav committed
400
401
	use super::*;

402
403
404
	#[derive(Default, Clone)]
	struct DummyRelayChainContext {
		ingress: HashMap<ParaId, ConsolidatedIngress>
Gav's avatar
Gav committed
405
406
	}

407
	impl RelayChainContext for DummyRelayChainContext {
Gav's avatar
Gav committed
408
		type Error = ();
409
		type FutureEgress = Box<Future<Item=ConsolidatedIngress,Error=()>>;
Gav's avatar
Gav committed
410

411
412
413
414
415
		fn unrouted_egress(&self, para_id: ParaId) -> Self::FutureEgress {
			match self.ingress.get(&para_id) {
				Some(ingress) => Box::new(future::ok(ingress.clone())),
				None => Box::new(future::empty()),
			}
Gav's avatar
Gav committed
416
		}
417
	}
Gav's avatar
Gav committed
418

419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
	#[derive(Clone)]
	struct DummyParachainContext;

	impl ParachainContext for DummyParachainContext {
		fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
			&self,
			_last_head: HeadData,
			ingress: I,
		) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> {
			// send messages right back.
			Ok((
				BlockData(vec![1, 2, 3, 4, 5,]),
				HeadData(vec![9, 9, 9]),
				Extrinsic {
					outgoing_messages: ingress.into_iter().map(|(id, msg)| OutgoingMessage {
						target: id,
						data: msg.0,
					}).collect(),
				}
			))
Gav's avatar
Gav committed
439
440
441
		}
	}

442
	#[test]
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
	fn collates_correct_queue_roots() {
		let mut context = DummyRelayChainContext::default();

		let id = ParaId::from(100);

		let a = ParaId::from(123);
		let b = ParaId::from(456);

		let messages_from_a = vec![
			Message(vec![1, 1, 1]),
			Message(b"helloworld".to_vec()),
		];
		let messages_from_b = vec![
			Message(b"dogglesworth".to_vec()),
			Message(b"buy_1_chili_con_carne_here_is_my_cash".to_vec()),
		];

		let root_a = ::polkadot_validation::message_queue_root(
			messages_from_a.iter().map(|msg| &msg.0)
		);

		let root_b = ::polkadot_validation::message_queue_root(
			messages_from_b.iter().map(|msg| &msg.0)
		);

		context.ingress.insert(id, ConsolidatedIngress(vec![
			(b, messages_from_b),
			(a, messages_from_a),
		]));

		let collation = collate(
			id,
			HeadData(vec![5]),
			context.clone(),
			DummyParachainContext,
			AuthorityKeyring::Alice.pair().into(),
		).wait().unwrap();

		// ascending order by root.
		assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]);
Gav's avatar
Gav committed
483
484
	}
}
485