lib.rs 14 KB
Newer Older
Gav's avatar
Gav committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Collation node logic.
Gav's avatar
Gav committed
18
19
20
21
22
23
24
25
26
27
28
29
30
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
//! (what we might call a witness or block data).
//!
//! One of collators' other roles is to route messages between chains.
//! Each parachain produces a list of "egress" posts of messages for each other
//! parachain on each block, for a total of N^2 lists all together.
//!
//! We will refer to the egress list at relay chain block X of parachain A with
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
31
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
Gav's avatar
Gav committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
//! up to the last point in history that messages were successfully routed
//! from that parachain, accounting for relay chain blocks where no candidate
//! from the collator's parachain was produced.
//!
//! In the case that all parachains route to each other and a candidate for the
//! collator's parachain was included in the last relay chain block, the collator
//! only has to gather egress posts from other parachains one block back in relay
//! chain history.
//!
//! This crate defines traits which provide context necessary for collation logic
//! to be performed, as the collation logic itself.

47
use std::collections::HashSet;
48
use std::fmt;
49
use std::sync::Arc;
50
use std::time::Duration;
Gav's avatar
Gav committed
51

52
use futures::{future, Stream, Future, IntoFuture};
53
use log::{info, warn};
54
use client::BlockchainEvents;
Gav Wood's avatar
Gav Wood committed
55
use primitives::{ed25519, Pair};
56
use polkadot_primitives::{BlockId, SessionKey, Hash, Block};
57
58
59
60
use polkadot_primitives::parachain::{
	self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic,
	PoVBlock,
};
61
use polkadot_cli::{PolkadotService, CustomConfiguration, ParachainHost};
62
63
64
use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi, TaskExecutor};
use polkadot_network::validation::{ValidationNetwork, SessionParams};
use polkadot_network::NetworkService;
65
use tokio::timer::Timeout;
66
use consensus_authorities::AuthoritiesApi;
thiolliere's avatar
thiolliere committed
67
use consensus_common::SelectChain;
68

69
pub use polkadot_cli::VersionInfo;
70
pub use polkadot_network::validation::Incoming;
71

72
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
Gav's avatar
Gav committed
73

74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/// Error to return when the head data was invalid.
#[derive(Clone, Copy, Debug)]
pub struct InvalidHead;

/// Collation errors.
#[derive(Debug)]
pub enum Error<R> {
	/// Error on the relay-chain side of things.
	Polkadot(R),
	/// Error on the collator side of things.
	Collator(InvalidHead),
}

impl<R: fmt::Display> fmt::Display for Error<R> {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
			Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
		}
	}
}

Gav's avatar
Gav committed
96
97
98
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
99
100
101
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
	/// Produce a candidate, given the latest ingress queue information and the last parachain head.
Gav's avatar
Gav committed
102
103
	fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
		&self,
104
		last_head: HeadData,
Gav's avatar
Gav committed
105
		ingress: I,
106
	) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead>;
Gav's avatar
Gav committed
107
108
109
110
111
112
}

/// Relay chain context needed to collate.
/// This encapsulates a network and local database which may store
/// some of the input.
pub trait RelayChainContext {
113
	type Error: ::std::fmt::Debug;
Gav's avatar
Gav committed
114
115
116

	/// Future that resolves to the un-routed egress queues of a parachain.
	/// The first item is the oldest.
117
	type FutureEgress: IntoFuture<Item=ConsolidatedIngress, Error=Self::Error>;
Gav's avatar
Gav committed
118
119

	/// Get un-routed egress queues from a parachain to the local parachain.
120
	fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress;
Gav's avatar
Gav committed
121
122
}

123
124
125
126
127
128
129
130
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
pub fn collate<'a, R, P>(
	local_id: ParaId,
	last_head: HeadData,
	relay_context: R,
	para_context: P,
	key: Arc<ed25519::Pair>,
)
131
	-> impl Future<Item=parachain::Collation, Error=Error<R::Error>> + 'a
Gav's avatar
Gav committed
132
	where
133
		R: RelayChainContext,
134
		R::Error: 'a,
Gav's avatar
Gav committed
135
136
137
		R::FutureEgress: 'a,
		P: ParachainContext + 'a,
{
138
	let ingress = relay_context.unrouted_egress(local_id).into_future().map_err(Error::Polkadot);
139
	ingress.and_then(move |ingress| {
140
		let (block_data, head_data, mut extrinsic) = para_context.produce_candidate(
141
			last_head,
142
			ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
143
		).map_err(Error::Collator)?;
Gav's avatar
Gav committed
144

145
		let block_data_hash = block_data.hash();
146
		let signature = key.sign(block_data_hash.as_ref()).into();
147
148
		let egress_queue_roots =
			::polkadot_validation::egress_roots(&mut extrinsic.outgoing_messages);
149
150

		let receipt = parachain::CandidateReceipt {
Gav's avatar
Gav committed
151
			parachain_index: local_id,
Gav Wood's avatar
Gav Wood committed
152
			collator: key.public(),
153
154
155
			signature,
			head_data,
			balance_uploads: Vec::new(),
156
			egress_queue_roots,
157
			fees: 0,
158
			block_data_hash,
159
160
		};

161
		Ok(parachain::Collation {
162
			receipt,
163
164
165
166
			pov: PoVBlock {
				block_data,
				ingress,
			},
167
		})
168
	})
Gav's avatar
Gav committed
169
170
}

171
/// Polkadot-api context.
172
173
174
175
176
struct ApiContext<P, E> {
	network: ValidationNetwork<P, E, NetworkService, TaskExecutor>,
	parent_hash: Hash,
	authorities: Vec<SessionKey>,
}
177

178
179
180
181
182
183
184
185
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
	P: ProvideRuntimeApi + Send + Sync,
	P::Api: ParachainHost<Block>,
	E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
{
	type Error = String;
	type FutureEgress = Box<Future<Item=ConsolidatedIngress, Error=String> + Send>;

186
187
188
189
190
	fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
		// TODO: https://github.com/paritytech/polkadot/issues/253
		//
		// Fetch ingress and accumulate all unrounted egress
		let _session = self.network.instantiate_session(SessionParams {
191
192
193
194
195
			local_session_key: None,
			parent_hash: self.parent_hash,
			authorities: self.authorities.clone(),
		}).map_err(|e| format!("unable to instantiate validation session: {:?}", e));

196
		Box::new(future::ok(ConsolidatedIngress(Vec::new())))
197
198
199
200
201
202
203
204
205
206
	}
}

struct CollationNode<P, E> {
	parachain_context: P,
	exit: E,
	para_id: ParaId,
	key: Arc<ed25519::Pair>,
}

207
208
209
210
211
212
213
214
215
216
impl<P, E> IntoExit for CollationNode<P, E> where
	P: ParachainContext + Send + 'static,
	E: Future<Item=(),Error=()> + Send + 'static
{
	type Exit = E;
	fn into_exit(self) -> Self::Exit {
		self.exit
	}
}

217
impl<P, E> Worker for CollationNode<P, E> where
218
	P: ParachainContext + Send + 'static,
219
	E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static
220
{
221
	type Work = Box<Future<Item=(),Error=()> + Send>;
222

223
224
225
	fn configuration(&self) -> CustomConfiguration {
		let mut config = CustomConfiguration::default();
		config.collating_for = Some((
Gav Wood's avatar
Gav Wood committed
226
			self.key.public(),
227
228
229
230
231
			self.para_id.clone(),
		));
		config
	}

232
	fn work<S>(self, service: &S, task_executor: TaskExecutor) -> Self::Work
233
234
		where S: PolkadotService,
	{
235
236
		let CollationNode { parachain_context, exit, para_id, key } = self;
		let client = service.client();
237
		let network = service.network();
238
		let known_oracle = client.clone();
thiolliere's avatar
thiolliere committed
239
240
241
242
243
244
		let select_chain = if let Some(select_chain) = service.select_chain() {
			select_chain
		} else {
			info!("The node cannot work because it can't select chain.");
			return Box::new(future::err(()));
		};
245
246

		let message_validator = polkadot_network::gossip::register_validator(
247
			network.clone(),
248
			move |block_hash: &Hash| {
thiolliere's avatar
thiolliere committed
249
				use client::BlockStatus;
250
251
252
253
254
				use polkadot_network::gossip::Known;

				match known_oracle.block_status(&BlockId::hash(*block_hash)) {
					Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
					Ok(BlockStatus::KnownBad) => Some(Known::Bad),
255
					Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
thiolliere's avatar
thiolliere committed
256
						match select_chain.leaves() {
257
258
259
260
261
262
263
							Err(_) => None,
							Ok(leaves) => if leaves.contains(block_hash) {
								Some(Known::Leaf)
							} else {
								Some(Known::Old)
							},
						}
264
265
266
267
268
269
270
271
272
273
274
				}
			},
		);

		let validation_network = ValidationNetwork::new(
			network.clone(),
			exit.clone(),
			message_validator,
			client.clone(),
			task_executor,
		);
275

276
		let inner_exit = exit.clone();
277
		let work = client.import_notification_stream()
278
279
280
281
282
			.for_each(move |notification| {
				macro_rules! try_fr {
					($e:expr) => {
						match $e {
							Ok(x) => x,
283
284
285
							Err(e) => return future::Either::A(future::err(Error::Polkadot(
								format!("{:?}", e)
							))),
286
						}
287
					}
288
289
290
291
292
293
				}

				let relay_parent = notification.hash;
				let id = BlockId::hash(relay_parent);

				let network = network.clone();
294
				let client = client.clone();
295
296
				let key = key.clone();
				let parachain_context = parachain_context.clone();
297
				let validation_network = validation_network.clone();
298
299

				let work = future::lazy(move || {
300
					let api = client.runtime_api();
Gav Wood's avatar
Gav Wood committed
301
					let last_head = match try_fr!(api.parachain_head(&id, para_id)) {
302
303
304
305
						Some(last_head) => last_head,
						None => return future::Either::A(future::ok(())),
					};

306
307
					let authorities = try_fr!(api.authorities(&id));

308
309
					let targets = compute_targets(
						para_id,
310
						authorities.as_slice(),
311
312
313
						try_fr!(api.duty_roster(&id)),
					);

314
315
316
317
318
319
					let context = ApiContext {
						network: validation_network,
						parent_hash: relay_parent,
						authorities,
					};

320
321
322
					let collation_work = collate(
						para_id,
						HeadData(last_head),
323
						context,
324
325
326
						parachain_context,
						key,
					).map(move |collation| {
327
						network.with_spec(move |spec, ctx| spec.add_local_collation(
328
329
330
331
332
333
334
335
336
							ctx,
							relay_parent,
							targets,
							collation,
						));
					});

					future::Either::B(collation_work)
				});
337
				let deadlined = Timeout::new(work, COLLATION_TIMEOUT);
338
339
				let silenced = deadlined.then(|res| match res {
					Ok(()) => Ok(()),
340
341
					Err(_) => {
						warn!("Collation failure: timeout");
342
						Ok(())
343
					}
344
345
				});

346
				tokio::spawn(silenced.select(inner_exit.clone()).then(|_| Ok(())));
347
348
349
350
351
352
353
354
				Ok(())
			});

		let work_and_exit = work.select(exit).then(|_| Ok(()));
		Box::new(work_and_exit) as Box<_>
	}
}

355
356
357
358
359
360
361
362
363
364
fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet<SessionKey> {
	use polkadot_primitives::parachain::Chain;

	roster.validator_duty.iter().enumerate()
		.filter(|&(_, c)| c == &Chain::Parachain(para_id))
		.filter_map(|(i, _)| session_keys.get(i))
		.cloned()
		.collect()
}

365
366
367
368
369
/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
/// arguments to the underlying polkadot node.
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
370
pub fn run_collator<P, E, I, ArgT>(
371
372
373
374
	parachain_context: P,
	para_id: ParaId,
	exit: E,
	key: Arc<ed25519::Pair>,
375
	args: I,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
376
	version: VersionInfo,
377
) -> polkadot_cli::error::Result<()> where
378
	P: ParachainContext + Send + 'static,
379
	E: IntoFuture<Item=(),Error=()>,
380
	E::Future: Send + Clone + Sync + 'static,
381
382
	I: IntoIterator<Item=ArgT>,
	ArgT: Into<std::ffi::OsString> + Clone,
383
384
{
	let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id, key };
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
385
	polkadot_cli::run(args, node_logic, version)
386
387
}

Gav's avatar
Gav committed
388
389
#[cfg(test)]
mod tests {
390
391
392
	use std::collections::HashMap;
	use polkadot_primitives::parachain::OutgoingMessage;
	use keyring::AuthorityKeyring;
Gav's avatar
Gav committed
393
394
	use super::*;

395
396
397
	#[derive(Default, Clone)]
	struct DummyRelayChainContext {
		ingress: HashMap<ParaId, ConsolidatedIngress>
Gav's avatar
Gav committed
398
399
	}

400
	impl RelayChainContext for DummyRelayChainContext {
Gav's avatar
Gav committed
401
		type Error = ();
402
		type FutureEgress = Box<Future<Item=ConsolidatedIngress,Error=()>>;
Gav's avatar
Gav committed
403

404
405
406
407
408
		fn unrouted_egress(&self, para_id: ParaId) -> Self::FutureEgress {
			match self.ingress.get(&para_id) {
				Some(ingress) => Box::new(future::ok(ingress.clone())),
				None => Box::new(future::empty()),
			}
Gav's avatar
Gav committed
409
		}
410
	}
Gav's avatar
Gav committed
411

412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
	#[derive(Clone)]
	struct DummyParachainContext;

	impl ParachainContext for DummyParachainContext {
		fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
			&self,
			_last_head: HeadData,
			ingress: I,
		) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> {
			// send messages right back.
			Ok((
				BlockData(vec![1, 2, 3, 4, 5,]),
				HeadData(vec![9, 9, 9]),
				Extrinsic {
					outgoing_messages: ingress.into_iter().map(|(id, msg)| OutgoingMessage {
						target: id,
						data: msg.0,
					}).collect(),
				}
			))
Gav's avatar
Gav committed
432
433
434
		}
	}

435
	#[test]
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
	fn collates_correct_queue_roots() {
		let mut context = DummyRelayChainContext::default();

		let id = ParaId::from(100);

		let a = ParaId::from(123);
		let b = ParaId::from(456);

		let messages_from_a = vec![
			Message(vec![1, 1, 1]),
			Message(b"helloworld".to_vec()),
		];
		let messages_from_b = vec![
			Message(b"dogglesworth".to_vec()),
			Message(b"buy_1_chili_con_carne_here_is_my_cash".to_vec()),
		];

		let root_a = ::polkadot_validation::message_queue_root(
			messages_from_a.iter().map(|msg| &msg.0)
		);

		let root_b = ::polkadot_validation::message_queue_root(
			messages_from_b.iter().map(|msg| &msg.0)
		);

		context.ingress.insert(id, ConsolidatedIngress(vec![
			(b, messages_from_b),
			(a, messages_from_a),
		]));

		let collation = collate(
			id,
			HeadData(vec![5]),
			context.clone(),
			DummyParachainContext,
			AuthorityKeyring::Alice.pair().into(),
		).wait().unwrap();

		// ascending order by root.
		assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]);
Gav's avatar
Gav committed
476
477
	}
}
478