lib.rs 14.4 KB
Newer Older
Gav's avatar
Gav committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Collation node logic.
Gav's avatar
Gav committed
18
19
20
21
22
23
24
25
26
27
28
29
30
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
//! (what we might call a witness or block data).
//!
//! One of collators' other roles is to route messages between chains.
//! Each parachain produces a list of "egress" posts of messages for each other
//! parachain on each block, for a total of N^2 lists all together.
//!
//! We will refer to the egress list at relay chain block X of parachain A with
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
31
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
Gav's avatar
Gav committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
//! up to the last point in history that messages were successfully routed
//! from that parachain, accounting for relay chain blocks where no candidate
//! from the collator's parachain was produced.
//!
//! In the case that all parachains route to each other and a candidate for the
//! collator's parachain was included in the last relay chain block, the collator
//! only has to gather egress posts from other parachains one block back in relay
//! chain history.
//!
//! This crate defines traits which provide context necessary for collation logic
//! to be performed, as the collation logic itself.

47
use std::collections::HashSet;
48
use std::fmt;
49
use std::sync::Arc;
50
use std::time::Duration;
Gav's avatar
Gav committed
51

52
use futures::{future, Stream, Future, IntoFuture};
53
use log::{info, warn};
54
use client::BlockchainEvents;
Gav Wood's avatar
Gav Wood committed
55
use primitives::{ed25519, Pair};
56
use polkadot_primitives::{BlockId, SessionKey, Hash, Block};
57
58
59
60
use polkadot_primitives::parachain::{
	self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic,
	PoVBlock,
};
61
use polkadot_cli::{PolkadotService, CustomConfiguration, ParachainHost};
62
63
64
use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi, TaskExecutor};
use polkadot_network::validation::{ValidationNetwork, SessionParams};
use polkadot_network::NetworkService;
65
use tokio::timer::Timeout;
thiolliere's avatar
thiolliere committed
66
use consensus_common::SelectChain;
67
use aura::AuraApi;
68

69
pub use polkadot_cli::VersionInfo;
70
pub use polkadot_network::validation::Incoming;
71

72
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
Gav's avatar
Gav committed
73

74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;

/// Collation errors.
#[derive(Debug)]
pub enum Error<R> {
	/// Error on the relay-chain side of things.
	Polkadot(R),
	/// Error on the collator side of things.
	Collator(InvalidHead),
}

impl<R: fmt::Display> fmt::Display for Error<R> {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
			Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
		}
	}
}

Gav's avatar
Gav committed
96
97
98
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
99
100
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
101
102
	type ProduceCandidate: IntoFuture<Item=(BlockData, HeadData, Extrinsic), Error=InvalidHead>;

103
	/// Produce a candidate, given the latest ingress queue information and the last parachain head.
Gav's avatar
Gav committed
104
105
	fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
		&self,
106
		last_head: HeadData,
Gav's avatar
Gav committed
107
		ingress: I,
108
	) -> Self::ProduceCandidate;
Gav's avatar
Gav committed
109
110
111
112
113
114
}

/// Relay chain context needed to collate.
/// This encapsulates a network and local database which may store
/// some of the input.
pub trait RelayChainContext {
115
	type Error: ::std::fmt::Debug;
Gav's avatar
Gav committed
116
117
118

	/// Future that resolves to the un-routed egress queues of a parachain.
	/// The first item is the oldest.
119
	type FutureEgress: IntoFuture<Item=ConsolidatedIngress, Error=Self::Error>;
Gav's avatar
Gav committed
120
121

	/// Get un-routed egress queues from a parachain to the local parachain.
122
	fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress;
Gav's avatar
Gav committed
123
124
}

125
126
127
128
129
130
131
132
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
pub fn collate<'a, R, P>(
	local_id: ParaId,
	last_head: HeadData,
	relay_context: R,
	para_context: P,
	key: Arc<ed25519::Pair>,
)
133
	-> impl Future<Item=parachain::Collation, Error=Error<R::Error>> + 'a
Gav's avatar
Gav committed
134
	where
135
		R: RelayChainContext,
136
		R::Error: 'a,
Gav's avatar
Gav committed
137
138
		R::FutureEgress: 'a,
		P: ParachainContext + 'a,
139
		<P::ProduceCandidate as IntoFuture>::Future: Send,
Gav's avatar
Gav committed
140
{
141
	let ingress = relay_context.unrouted_egress(local_id).into_future().map_err(Error::Polkadot);
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
	ingress
		.and_then(move |ingress| {
			para_context.produce_candidate(
				last_head,
				ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
			)
				.into_future()
				.map(move |x| (ingress, x))
				.map_err(Error::Collator)
		})
		.and_then(move |(ingress, (block_data, head_data, mut extrinsic))| {
			let block_data_hash = block_data.hash();
			let signature = key.sign(block_data_hash.as_ref()).into();
			let egress_queue_roots =
				polkadot_validation::egress_roots(&mut extrinsic.outgoing_messages);

			let receipt = parachain::CandidateReceipt {
				parachain_index: local_id,
				collator: key.public(),
				signature,
				head_data,
				egress_queue_roots,
				fees: 0,
				block_data_hash,
				upward_messages: Vec::new(),
			};

			Ok(parachain::Collation {
				receipt,
				pov: PoVBlock {
					block_data,
					ingress,
				},
			})
176
		})
Gav's avatar
Gav committed
177
178
}

179
/// Polkadot-api context.
180
181
182
183
184
struct ApiContext<P, E> {
	network: ValidationNetwork<P, E, NetworkService, TaskExecutor>,
	parent_hash: Hash,
	authorities: Vec<SessionKey>,
}
185

186
187
188
189
190
191
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
	P: ProvideRuntimeApi + Send + Sync,
	P::Api: ParachainHost<Block>,
	E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
{
	type Error = String;
192
	type FutureEgress = Box<dyn Future<Item=ConsolidatedIngress, Error=String> + Send>;
193

194
195
196
197
198
	fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
		// TODO: https://github.com/paritytech/polkadot/issues/253
		//
		// Fetch ingress and accumulate all unrounted egress
		let _session = self.network.instantiate_session(SessionParams {
199
200
201
202
203
			local_session_key: None,
			parent_hash: self.parent_hash,
			authorities: self.authorities.clone(),
		}).map_err(|e| format!("unable to instantiate validation session: {:?}", e));

204
		Box::new(future::ok(ConsolidatedIngress(Vec::new())))
205
206
207
208
209
210
211
212
213
214
	}
}

struct CollationNode<P, E> {
	parachain_context: P,
	exit: E,
	para_id: ParaId,
	key: Arc<ed25519::Pair>,
}

215
216
217
218
219
220
221
222
223
224
impl<P, E> IntoExit for CollationNode<P, E> where
	P: ParachainContext + Send + 'static,
	E: Future<Item=(),Error=()> + Send + 'static
{
	type Exit = E;
	fn into_exit(self) -> Self::Exit {
		self.exit
	}
}

225
impl<P, E> Worker for CollationNode<P, E> where
226
	P: ParachainContext + Send + 'static,
227
228
	E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
	<P::ProduceCandidate as IntoFuture>::Future: Send + 'static,
229
{
230
	type Work = Box<dyn Future<Item=(),Error=()> + Send>;
231

232
233
234
	fn configuration(&self) -> CustomConfiguration {
		let mut config = CustomConfiguration::default();
		config.collating_for = Some((
Gav Wood's avatar
Gav Wood committed
235
			self.key.public(),
236
237
238
239
240
			self.para_id.clone(),
		));
		config
	}

241
	fn work<S>(self, service: &S, task_executor: TaskExecutor) -> Self::Work
242
243
		where S: PolkadotService,
	{
244
245
		let CollationNode { parachain_context, exit, para_id, key } = self;
		let client = service.client();
246
		let network = service.network();
247
		let known_oracle = client.clone();
thiolliere's avatar
thiolliere committed
248
249
250
251
252
253
		let select_chain = if let Some(select_chain) = service.select_chain() {
			select_chain
		} else {
			info!("The node cannot work because it can't select chain.");
			return Box::new(future::err(()));
		};
254
255

		let message_validator = polkadot_network::gossip::register_validator(
256
			network.clone(),
257
			move |block_hash: &Hash| {
thiolliere's avatar
thiolliere committed
258
				use client::BlockStatus;
259
260
261
262
263
				use polkadot_network::gossip::Known;

				match known_oracle.block_status(&BlockId::hash(*block_hash)) {
					Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
					Ok(BlockStatus::KnownBad) => Some(Known::Bad),
264
					Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
thiolliere's avatar
thiolliere committed
265
						match select_chain.leaves() {
266
267
268
269
270
271
272
							Err(_) => None,
							Ok(leaves) => if leaves.contains(block_hash) {
								Some(Known::Leaf)
							} else {
								Some(Known::Old)
							},
						}
273
274
275
276
277
278
279
280
281
282
283
				}
			},
		);

		let validation_network = ValidationNetwork::new(
			network.clone(),
			exit.clone(),
			message_validator,
			client.clone(),
			task_executor,
		);
284

285
		let inner_exit = exit.clone();
286
		let work = client.import_notification_stream()
287
288
289
290
291
			.for_each(move |notification| {
				macro_rules! try_fr {
					($e:expr) => {
						match $e {
							Ok(x) => x,
292
293
294
							Err(e) => return future::Either::A(future::err(Error::Polkadot(
								format!("{:?}", e)
							))),
295
						}
296
					}
297
298
299
300
301
302
				}

				let relay_parent = notification.hash;
				let id = BlockId::hash(relay_parent);

				let network = network.clone();
303
				let client = client.clone();
304
305
				let key = key.clone();
				let parachain_context = parachain_context.clone();
306
				let validation_network = validation_network.clone();
307
308

				let work = future::lazy(move || {
309
					let api = client.runtime_api();
Gav Wood's avatar
Gav Wood committed
310
					let last_head = match try_fr!(api.parachain_head(&id, para_id)) {
311
312
313
314
						Some(last_head) => last_head,
						None => return future::Either::A(future::ok(())),
					};

315
316
					let authorities = try_fr!(api.authorities(&id));

317
318
					let targets = compute_targets(
						para_id,
319
						authorities.as_slice(),
320
321
322
						try_fr!(api.duty_roster(&id)),
					);

323
324
325
326
327
328
					let context = ApiContext {
						network: validation_network,
						parent_hash: relay_parent,
						authorities,
					};

329
330
331
					let collation_work = collate(
						para_id,
						HeadData(last_head),
332
						context,
333
334
335
						parachain_context,
						key,
					).map(move |collation| {
336
						network.with_spec(move |spec, ctx| spec.add_local_collation(
337
338
339
340
341
342
343
344
345
							ctx,
							relay_parent,
							targets,
							collation,
						));
					});

					future::Either::B(collation_work)
				});
346
				let deadlined = Timeout::new(work, COLLATION_TIMEOUT);
347
348
				let silenced = deadlined.then(|res| match res {
					Ok(()) => Ok(()),
349
350
					Err(_) => {
						warn!("Collation failure: timeout");
351
						Ok(())
352
					}
353
354
				});

355
				tokio::spawn(silenced.select(inner_exit.clone()).then(|_| Ok(())));
356
357
358
359
360
361
362
363
				Ok(())
			});

		let work_and_exit = work.select(exit).then(|_| Ok(()));
		Box::new(work_and_exit) as Box<_>
	}
}

364
365
366
367
368
369
370
371
372
373
fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet<SessionKey> {
	use polkadot_primitives::parachain::Chain;

	roster.validator_duty.iter().enumerate()
		.filter(|&(_, c)| c == &Chain::Parachain(para_id))
		.filter_map(|(i, _)| session_keys.get(i))
		.cloned()
		.collect()
}

374
375
376
377
378
/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
/// arguments to the underlying polkadot node.
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
379
pub fn run_collator<P, E, I, ArgT>(
380
381
382
383
	parachain_context: P,
	para_id: ParaId,
	exit: E,
	key: Arc<ed25519::Pair>,
384
	args: I,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
385
	version: VersionInfo,
386
) -> polkadot_cli::error::Result<()> where
387
	P: ParachainContext + Send + 'static,
388
	<P::ProduceCandidate as IntoFuture>::Future: Send + 'static,
389
	E: IntoFuture<Item=(),Error=()>,
390
	E::Future: Send + Clone + Sync + 'static,
391
392
	I: IntoIterator<Item=ArgT>,
	ArgT: Into<std::ffi::OsString> + Clone,
393
394
{
	let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id, key };
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
395
	polkadot_cli::run(args, node_logic, version)
396
397
}

Gav's avatar
Gav committed
398
399
#[cfg(test)]
mod tests {
400
401
402
	use std::collections::HashMap;
	use polkadot_primitives::parachain::OutgoingMessage;
	use keyring::AuthorityKeyring;
Gav's avatar
Gav committed
403
404
	use super::*;

405
406
407
	#[derive(Default, Clone)]
	struct DummyRelayChainContext {
		ingress: HashMap<ParaId, ConsolidatedIngress>
Gav's avatar
Gav committed
408
409
	}

410
	impl RelayChainContext for DummyRelayChainContext {
Gav's avatar
Gav committed
411
		type Error = ();
412
		type FutureEgress = Box<dyn Future<Item=ConsolidatedIngress,Error=()>>;
Gav's avatar
Gav committed
413

414
415
416
417
418
		fn unrouted_egress(&self, para_id: ParaId) -> Self::FutureEgress {
			match self.ingress.get(&para_id) {
				Some(ingress) => Box::new(future::ok(ingress.clone())),
				None => Box::new(future::empty()),
			}
Gav's avatar
Gav committed
419
		}
420
	}
Gav's avatar
Gav committed
421

422
423
424
425
	#[derive(Clone)]
	struct DummyParachainContext;

	impl ParachainContext for DummyParachainContext {
426
427
		type ProduceCandidate = Result<(BlockData, HeadData, Extrinsic), InvalidHead>;

428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
		fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
			&self,
			_last_head: HeadData,
			ingress: I,
		) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> {
			// send messages right back.
			Ok((
				BlockData(vec![1, 2, 3, 4, 5,]),
				HeadData(vec![9, 9, 9]),
				Extrinsic {
					outgoing_messages: ingress.into_iter().map(|(id, msg)| OutgoingMessage {
						target: id,
						data: msg.0,
					}).collect(),
				}
			))
Gav's avatar
Gav committed
444
445
446
		}
	}

447
	#[test]
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
	fn collates_correct_queue_roots() {
		let mut context = DummyRelayChainContext::default();

		let id = ParaId::from(100);

		let a = ParaId::from(123);
		let b = ParaId::from(456);

		let messages_from_a = vec![
			Message(vec![1, 1, 1]),
			Message(b"helloworld".to_vec()),
		];
		let messages_from_b = vec![
			Message(b"dogglesworth".to_vec()),
			Message(b"buy_1_chili_con_carne_here_is_my_cash".to_vec()),
		];

		let root_a = ::polkadot_validation::message_queue_root(
			messages_from_a.iter().map(|msg| &msg.0)
		);

		let root_b = ::polkadot_validation::message_queue_root(
			messages_from_b.iter().map(|msg| &msg.0)
		);

		context.ingress.insert(id, ConsolidatedIngress(vec![
			(b, messages_from_b),
			(a, messages_from_a),
		]));

		let collation = collate(
			id,
			HeadData(vec![5]),
			context.clone(),
			DummyParachainContext,
			AuthorityKeyring::Alice.pair().into(),
		).wait().unwrap();

		// ascending order by root.
		assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]);
Gav's avatar
Gav committed
488
489
	}
}
490