lib.rs 22.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Propagation and agreement of candidates.
//!
//! Authorities are split into groups by parachain, and each authority might come
//! up its own candidate for their parachain. Within groups, authorities pass around
//! their candidates and produce statements of validity.
//!
//! Any candidate that receives majority approval by the authorities in a group
//! may be subject to inclusion, unless any authorities flag that candidate as invalid.
//!
//! Wrongly flagging as invalid should be strongly disincentivized, so that in the
//! equilibrium state it is not expected to happen. Likewise with the submission
//! of invalid blocks.
//!
//! Groups themselves may be compromised by malicious authorities.

extern crate parking_lot;
33
extern crate polkadot_availability_store as extrinsic_store;
34
extern crate polkadot_statement_table as table;
35
extern crate polkadot_parachain as parachain;
36
extern crate polkadot_runtime;
Gav Wood's avatar
Gav Wood committed
37
extern crate polkadot_primitives;
38

39
extern crate parity_codec as codec;
40
extern crate substrate_primitives as primitives;
41
42
extern crate srml_support as runtime_support;
extern crate sr_primitives as runtime_primitives;
43
extern crate substrate_client as client;
44
extern crate substrate_trie as trie;
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
45

46
extern crate exit_future;
47
extern crate tokio;
48
extern crate substrate_consensus_common as consensus;
49
extern crate substrate_consensus_aura_primitives as aura_primitives;
50
51
extern crate substrate_finality_grandpa as grandpa;
extern crate substrate_transaction_pool as transaction_pool;
52

53
54
#[macro_use]
extern crate error_chain;
55
56
57
58

#[macro_use]
extern crate futures;

59
60
61
#[macro_use]
extern crate log;

62
63
64
#[cfg(test)]
extern crate substrate_keyring;

65
66
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
67
use std::time::{self, Duration, Instant};
68

69
use client::{BlockchainEvents, ChainHead, BlockBody};
70
use client::blockchain::HeaderBackend;
71
72
73
use client::block_builder::api::BlockBuilder as BlockBuilderApi;
use client::runtime_api::Core;
use codec::Encode;
74
use extrinsic_store::Store as ExtrinsicStore;
75
use parking_lot::Mutex;
76
use polkadot_primitives::{
Gav Wood's avatar
Gav Wood committed
77
	Hash, Block, BlockId, BlockNumber, Header, SessionKey, InherentData
78
};
79
use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt, CandidateSignature};
80
use polkadot_primitives::parachain::{AttestedCandidate, ParachainHost, Statement as PrimitiveStatement};
Gav Wood's avatar
Gav Wood committed
81
use primitives::{Ed25519AuthorityId as AuthorityId, ed25519};
82
use runtime_primitives::traits::ProvideRuntimeApi;
83
84
use tokio::runtime::TaskExecutor;
use tokio::timer::{Delay, Interval};
85
use transaction_pool::txpool::{Pool, ChainApi as PoolChainApi};
86
use aura_primitives::AuraConsensusData;
87

88
use attestation_service::ServiceHandle;
89
use futures::prelude::*;
90
use futures::future::{self, Either};
91
92
use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion;
93

94
pub use self::collation::{validate_collation, egress_trie_root, Collators};
95
pub use self::error::{ErrorKind, Error};
96
pub use self::shared_table::{SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement, GenericStatement};
97

98
mod attestation_service;
99
100
mod dynamic_inclusion;
mod evaluation;
101
mod error;
102
mod shared_table;
103

104
105
pub mod collation;

106
107
108
// block size limit.
const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;

109
/// A handle to a statement table router.
110
111
112
///
/// This is expected to be a lightweight, shared type like an `Arc`.
pub trait TableRouter: Clone {
113
114
115
116
117
	/// Errors when fetching data from the network.
	type Error;
	/// Future that resolves when candidate data is fetched.
	type FetchCandidate: IntoFuture<Item=BlockData,Error=Self::Error>;

118
119
120
	/// Call with local candidate data. This will make the data available on the network,
	/// and sign, import, and broadcast a statement about the candidate.
	fn local_candidate(&self, candidate: CandidateReceipt, block_data: BlockData, extrinsic: ParachainExtrinsic);
121
122
123
124
125

	/// Fetch block data for a specific candidate.
	fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate;
}

126
/// A long-lived network which can create parachain statement and BFT message routing processes on demand.
127
128
129
130
pub trait Network {
	/// The table router type. This should handle importing of any statements,
	/// routing statements to peers, and driving completion of any `StatementProducers`.
	type TableRouter: TableRouter;
131
132

	/// Instantiate a table router using the given shared table and task executor.
133
134
135
136
137
138
	fn communication_for(
		&self,
		validators: &[SessionKey],
		table: Arc<SharedTable>,
		task_executor: TaskExecutor
	) -> Self::TableRouter;
139
140
}

141
/// Information about a specific group.
142
#[derive(Debug, Clone, Default)]
143
144
pub struct GroupInfo {
	/// Authorities meant to check validity of candidates.
145
	pub validity_guarantors: HashSet<SessionKey>,
146
147
148
149
150
151
152
	/// Number of votes needed for validity.
	pub needed_validity: usize,
}

/// Sign a table statement against a parent hash.
/// The actual message signed is the encoded statement concatenated with the
/// parent hash.
153
pub fn sign_table_statement(statement: &Statement, key: &ed25519::Pair, parent_hash: &Hash) -> CandidateSignature {
154
155
156
157
	// we sign using the primitive statement type because that's what the runtime
	// expects. These types probably encode the same way so this clone could be optimized
	// out in the future.
	let mut encoded = PrimitiveStatement::from(statement.clone()).encode();
158
	encoded.extend(parent_hash.as_ref());
159
160
161

	key.sign(&encoded).into()
}
162

163
164
165
/// Check signature on table statement.
pub fn check_statement(statement: &Statement, signature: &CandidateSignature, signer: SessionKey, parent_hash: &Hash) -> bool {
	use runtime_primitives::traits::Verify;
166

167
	let mut encoded = PrimitiveStatement::from(statement.clone()).encode();
168
	encoded.extend(parent_hash.as_ref());
169

170
	signature.verify(&encoded[..], &signer.into())
171
172
}

173
fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId], local_id: AuthorityId) -> Result<(HashMap<ParaId, GroupInfo>, LocalDuty), Error> {
174
175
176
177
	if roster.validator_duty.len() != authorities.len() {
		bail!(ErrorKind::InvalidDutyRosterLength(authorities.len(), roster.validator_duty.len()))
	}

178
	let mut local_validation = None;
179
180
	let mut map = HashMap::new();

181
182
	let duty_iter = authorities.iter().zip(&roster.validator_duty);
	for (authority, v_duty) in duty_iter {
183
184
185
		if authority == &local_id {
			local_validation = Some(v_duty.clone());
		}
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201

		match *v_duty {
			Chain::Relay => {}, // does nothing for now.
			Chain::Parachain(ref id) => {
				map.entry(id.clone()).or_insert_with(GroupInfo::default)
					.validity_guarantors
					.insert(authority.clone());
			}
		}
	}

	for live_group in map.values_mut() {
		let validity_len = live_group.validity_guarantors.len();
		live_group.needed_validity = validity_len / 2 + validity_len % 2;
	}

202
203
204
205
206
207
208
209
210
211
212
213
	match local_validation {
		Some(local_validation) => {
			let local_duty = LocalDuty {
				validation: local_validation,
			};

			Ok((map, local_duty))
		}
		None => bail!(ErrorKind::NotValidator(local_id)),
	}
}

214
215
/// Constructs parachain-agreement instances.
struct ParachainConsensus<C, N, P> {
216
	/// The client instance.
217
	client: Arc<P>,
218
	/// The backing network handle.
219
	network: N,
220
	/// Parachain collators.
221
	collators: C,
222
	/// handle to remote task executor
223
	handle: TaskExecutor,
224
	/// Store for extrinsic data.
225
226
227
	extrinsic_store: ExtrinsicStore,
	/// Live agreements.
	live_instances: Mutex<HashMap<Hash, Arc<AttestationTracker>>>,
228
229
}

230
231
232
233
impl<C, N, P> ParachainConsensus<C, N, P> where
	C: Collators + Send + 'static,
	N: Network,
	P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
234
	P::Api: ParachainHost<Block> + BlockBuilderApi<Block, InherentData>,
235
236
	<C::Collation as IntoFuture>::Future: Send + 'static,
	N::TableRouter: Send + 'static,
237
{
238
239
240
241
242
	/// Get an attestation table for given parent hash.
	///
	/// This starts a parachain agreement process for given parent hash if
	/// one has not already started.
	fn get_or_instantiate(
243
		&self,
244
		parent_hash: Hash,
245
		authorities: &[AuthorityId],
246
		sign_with: Arc<ed25519::Pair>,
247
248
249
250
251
252
253
	)
		-> Result<Arc<AttestationTracker>, Error>
	{
		let mut live_instances = self.live_instances.lock();
		if let Some(tracker) = live_instances.get(&parent_hash) {
			return Ok(tracker.clone());
		}
254

Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
255
		let id = BlockId::hash(parent_hash);
256
		let duty_roster = self.client.runtime_api().duty_roster(&id)?;
257

258
259
260
		let (group_info, local_duty) = make_group_info(
			duty_roster,
			authorities,
261
			sign_with.public().into(),
262
263
		)?;

264
		info!("Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
265
266
			parent_hash, local_duty.validation);

267
		let active_parachains = self.client.runtime_api().active_parachains(&id)?;
268

269
270
		debug!(target: "consensus", "Active parachains: {:?}", active_parachains);

271
		let table = Arc::new(SharedTable::new(group_info, sign_with.clone(), parent_hash, self.extrinsic_store.clone()));
272
		let router = self.network.communication_for(
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
			authorities,
			table.clone(),
			self.handle.clone()
		);

		let validation_para = match local_duty.validation {
			Chain::Relay => None,
			Chain::Parachain(id) => Some(id),
		};

		let collation_work = validation_para.map(|para| CollationFetch::new(
			para,
			id.clone(),
			parent_hash.clone(),
			self.collators.clone(),
			self.client.clone(),
		));
290

291
292
293
294
		let drop_signal = dispatch_collation_work(
			router.clone(),
			&self.handle,
			collation_work,
295
			self.extrinsic_store.clone(),
296
297
		);

298
299
		let tracker = Arc::new(AttestationTracker {
			table,
300
			started: Instant::now(),
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
			_drop_signal: drop_signal
		});

		live_instances.insert(parent_hash, tracker.clone());

		Ok(tracker)
	}

	/// Retain consensus sessions matching predicate.
	fn retain<F: FnMut(&Hash) -> bool>(&self, mut pred: F) {
		self.live_instances.lock().retain(|k, _| pred(k))
	}
}

/// Parachain consensus for a single block.
struct AttestationTracker {
	_drop_signal: exit_future::Signal,
	table: Arc<SharedTable>,
319
	started: Instant,
320
321
322
}

/// Polkadot proposer factory.
323
pub struct ProposerFactory<C, N, P, TxApi: PoolChainApi> {
324
325
	parachain_consensus: Arc<ParachainConsensus<C, N, P>>,
	transaction_pool: Arc<Pool<TxApi>>,
Gav Wood's avatar
Gav Wood committed
326
	key: Arc<ed25519::Pair>,
327
	_service_handle: ServiceHandle,
328
329
330
}

impl<C, N, P, TxApi> ProposerFactory<C, N, P, TxApi> where
331
332
333
334
335
336
337
	C: Collators + Send + Sync + 'static,
	<C::Collation as IntoFuture>::Future: Send + 'static,
	P: BlockchainEvents<Block> + ChainHead<Block> + BlockBody<Block>,
	P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
	P::Api: ParachainHost<Block> + Core<Block> + BlockBuilderApi<Block, InherentData>,
	N: Network + Send + Sync + 'static,
	N::TableRouter: Send + 'static,
338
339
340
	TxApi: PoolChainApi,
{
	/// Create a new proposer factory.
341
342
343
344
	pub fn new(
		client: Arc<P>,
		network: N,
		collators: C,
345
		transaction_pool: Arc<Pool<TxApi>>,
346
347
348
		thread_pool: TaskExecutor,
		key: Arc<ed25519::Pair>,
		extrinsic_store: ExtrinsicStore,
349
	) -> Self {
350
351
352
353
354
355
356
357
358
359
360
361
362
		let parachain_consensus = Arc::new(ParachainConsensus {
			client: client.clone(),
			network,
			collators,
			handle: thread_pool.clone(),
			extrinsic_store: extrinsic_store.clone(),
			live_instances: Mutex::new(HashMap::new()),
		});

		let service_handle = ::attestation_service::start(
			client,
			parachain_consensus.clone(),
			thread_pool,
Gav Wood's avatar
Gav Wood committed
363
			key.clone(),
364
365
366
			extrinsic_store,
		);

367
368
369
		ProposerFactory {
			parachain_consensus,
			transaction_pool,
Gav Wood's avatar
Gav Wood committed
370
			key,
371
			_service_handle: service_handle,
372
373
374
375
		}
	}
}

376
impl<C, N, P, TxApi> consensus::Environment<Block, AuraConsensusData> for ProposerFactory<C, N, P, TxApi> where
377
378
379
380
	C: Collators + Send + 'static,
	N: Network,
	TxApi: PoolChainApi<Block=Block>,
	P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
381
	P::Api: ParachainHost<Block> + BlockBuilderApi<Block, InherentData>,
382
383
384
385
386
387
388
389
390
391
392
393
394
	<C::Collation as IntoFuture>::Future: Send + 'static,
	N::TableRouter: Send + 'static,
{
	type Proposer = Proposer<P, TxApi>;
	type Error = Error;

	fn init(
		&self,
		parent_header: &Header,
		authorities: &[AuthorityId],
	) -> Result<Self::Proposer, Error> {
		let parent_hash = parent_header.hash();
		let parent_id = BlockId::hash(parent_hash);
Gav Wood's avatar
Gav Wood committed
395
		let sign_with = self.key.clone();
396
		let tracker = self.parachain_consensus.get_or_instantiate(
397
			parent_hash,
398
399
400
401
402
403
404
405
406
			authorities,
			sign_with,
		)?;

		Ok(Proposer {
			client: self.parachain_consensus.client.clone(),
			tracker,
			parent_hash,
			parent_id,
407
			parent_number: parent_header.number,
408
			transaction_pool: self.transaction_pool.clone(),
409
		})
410
411
412
	}
}

413
414
415
416
417
418
// dispatch collation work to be done in the background. returns a signal object
// that should fire when the collation work is no longer necessary (e.g. when the proposer object is dropped)
fn dispatch_collation_work<R, C, P>(
	router: R,
	handle: &TaskExecutor,
	work: Option<CollationFetch<C, P>>,
419
	extrinsic_store: ExtrinsicStore,
420
421
) -> exit_future::Signal where
	C: Collators + Send + 'static,
422
423
	P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
	P::Api: ParachainHost<Block>,
424
425
426
	<C::Collation as IntoFuture>::Future: Send + 'static,
	R: TableRouter + Send + 'static,
{
427
428
	use extrinsic_store::Data;

429
	let (signal, exit) = exit_future::signal();
430
431
432
433
434
435
436

	let work = match work {
		Some(w) => w,
		None => return signal,
	};

	let relay_parent = work.relay_parent();
437
	let handled_work = work.then(move |result| match result {
438
439
440
441
442
443
444
445
446
447
		Ok((collation, extrinsic)) => {
			let res = extrinsic_store.make_available(Data {
				relay_parent,
				parachain_id: collation.receipt.parachain_index,
				candidate_hash: collation.receipt.hash(),
				block_data: collation.block_data.clone(),
				extrinsic: Some(extrinsic.clone()),
			});

			match res {
448
449
450
451
452
				Ok(()) => {
					// TODO: https://github.com/paritytech/polkadot/issues/51
					// Erasure-code and provide merkle branches.
					router.local_candidate(collation.receipt, collation.block_data, extrinsic)
				}
453
454
455
456
				Err(e) =>
					warn!(target: "consensus", "Failed to make collation data available: {:?}", e),
			}

457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
			Ok(())
		}
		Err(_e) => {
			warn!(target: "consensus", "Failed to collate candidate");
			Ok(())
		}
	});

	let cancellable_work = handled_work.select(exit).then(|_| Ok(()));

	// spawn onto thread pool.
	handle.spawn(cancellable_work);
	signal
}

472
473
struct LocalDuty {
	validation: Chain,
474
475
476
}

/// The Polkadot proposer logic.
477
478
479
pub struct Proposer<C: Send + Sync, TxApi: PoolChainApi> where
	C: ProvideRuntimeApi + HeaderBackend<Block>,
{
480
	client: Arc<C>,
Gav Wood's avatar
Gav Wood committed
481
	parent_hash: Hash,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
482
	parent_id: BlockId,
483
	parent_number: BlockNumber,
484
485
	tracker: Arc<AttestationTracker>,
	transaction_pool: Arc<Pool<TxApi>>,
486
487
}

488
impl<C, TxApi> consensus::Proposer<Block, AuraConsensusData> for Proposer<C, TxApi> where
489
490
	TxApi: PoolChainApi<Block=Block>,
	C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
491
	C::Api: ParachainHost<Block> + BlockBuilderApi<Block, InherentData>,
492
{
493
	type Error = Error;
494
495
	type Create = Either<
		CreateProposal<C, TxApi>,
Gav Wood's avatar
Gav Wood committed
496
		future::FutureResult<Block, Error>,
497
	>;
498

499
	fn propose(&self, consensus_data: AuraConsensusData) -> Self::Create {
500
		const ATTEMPT_PROPOSE_EVERY: Duration = Duration::from_millis(100);
501
		const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates.
502

503
		let initial_included = self.tracker.table.includable_count();
504
		let now = Instant::now();
505
506
507
508
509
510
511
512

		let dynamic_inclusion = DynamicInclusion::new(
			self.tracker.table.num_parachains(),
			self.tracker.started,
			Duration::from_secs(consensus_data.slot_duration / SLOT_DURATION_DENOMINATOR),
		);

		let enough_candidates = dynamic_inclusion.acceptable_in(
513
			now,
514
			initial_included,
515
		).unwrap_or_else(|| now + Duration::from_millis(1));
516

517
518
519
520
		let believed_timestamp = consensus_data.timestamp;

		// set up delay until next allowed timestamp.
		let current_timestamp = current_timestamp();
Gav Wood's avatar
Gav Wood committed
521
		let delay_future = if current_timestamp >= believed_timestamp {
522
523
524
			None
		} else {
			Some(Delay::new(
Gav Wood's avatar
Gav Wood committed
525
				Instant::now() + Duration::from_secs(current_timestamp - believed_timestamp)
526
527
528
			))
		};

529
		let timing = ProposalTiming {
530
			minimum: delay_future,
531
532
			attempt_propose: Interval::new(now + ATTEMPT_PROPOSE_EVERY, ATTEMPT_PROPOSE_EVERY),
			enough_candidates: Delay::new(enough_candidates),
533
			dynamic_inclusion,
534
			last_included: initial_included,
535
		};
536

537
		Either::A(CreateProposal {
538
539
540
541
542
			parent_hash: self.parent_hash.clone(),
			parent_number: self.parent_number.clone(),
			parent_id: self.parent_id.clone(),
			client: self.client.clone(),
			transaction_pool: self.transaction_pool.clone(),
543
			table: self.tracker.table.clone(),
544
545
			believed_minimum_timestamp: believed_timestamp,
			consensus_data,
546
547
548
			timing,
		})
	}
549
}
550

Gav Wood's avatar
Gav Wood committed
551
fn current_timestamp() -> u64 {
552
553
554
	time::SystemTime::now().duration_since(time::UNIX_EPOCH)
		.expect("now always later than unix epoch; qed")
		.as_secs()
555
		.into()
556
}
557

558
struct ProposalTiming {
559
	minimum: Option<Delay>,
560
561
	attempt_propose: Interval,
	dynamic_inclusion: DynamicInclusion,
562
	enough_candidates: Delay,
563
564
	last_included: usize,
}
565

566
567
568
impl ProposalTiming {
	// whether it's time to attempt a proposal.
	// shouldn't be called outside of the context of a task.
569
	fn poll(&mut self, included: usize) -> Poll<(), ErrorKind> {
570
571
572
573
574
		// first drain from the interval so when the minimum delay is up
		// we don't have any notifications built up.
		//
		// this interval is just meant to produce periodic task wakeups
		// that lead to the `dynamic_inclusion` getting updated as necessary.
575
		while let Async::Ready(x) = self.attempt_propose.poll().map_err(ErrorKind::Timer)? {
576
577
			x.expect("timer still alive; intervals never end; qed");
		}
578

579
580
581
582
583
584
585
586
		// wait until the minimum time has passed.
		if let Some(mut minimum) = self.minimum.take() {
			if let Async::NotReady = minimum.poll().map_err(ErrorKind::Timer)? {
				self.minimum = Some(minimum);
				return Ok(Async::NotReady);
			}
		}

587
		if included == self.last_included {
588
			return self.enough_candidates.poll().map_err(ErrorKind::Timer);
589
590
591
592
		}

		// the amount of includable candidates has changed. schedule a wakeup
		// if it's not sufficient anymore.
593
594
		match self.dynamic_inclusion.acceptable_in(Instant::now(), included) {
			Some(instant) => {
595
				self.last_included = included;
596
597
				self.enough_candidates.reset(instant);
				self.enough_candidates.poll().map_err(ErrorKind::Timer)
598
			}
599
			None => Ok(Async::Ready(())),
600
		}
asynchronous rob's avatar
asynchronous rob committed
601
	}
602
}
603

604
/// Future which resolves upon the creation of a proposal.
605
pub struct CreateProposal<C: Send + Sync, TxApi: PoolChainApi> {
Gav Wood's avatar
Gav Wood committed
606
	parent_hash: Hash,
607
	parent_number: BlockNumber,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
608
	parent_id: BlockId,
609
	client: Arc<C>,
610
	transaction_pool: Arc<Pool<TxApi>>,
611
612
	table: Arc<SharedTable>,
	timing: ProposalTiming,
613
614
	believed_minimum_timestamp: u64,
	consensus_data: AuraConsensusData,
615
616
}

617
618
619
impl<C, TxApi> CreateProposal<C, TxApi> where
	TxApi: PoolChainApi<Block=Block>,
	C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
620
	C::Api: ParachainHost<Block> + BlockBuilderApi<Block, InherentData>,
621
{
622
	fn propose_with(&self, candidates: Vec<AttestedCandidate>) -> Result<Block, Error> {
623
		use client::block_builder::BlockBuilder;
624
		use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
625

626
627
		const MAX_TRANSACTIONS: usize = 40;

628
		let inherent_data = InherentData {
629
			timestamp: self.believed_minimum_timestamp,
630
			parachains: candidates,
631
			aura_expected_slot: self.consensus_data.slot,
632
633
		};

634
635
		let runtime_api = self.client.runtime_api();

636
		let mut block_builder = BlockBuilder::at_block(&self.parent_id, &*self.client)?;
637
638

		{
Gav Wood's avatar
Gav Wood committed
639
			let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?;
640
641
642
643
			for inherent in inherents {
				block_builder.push(inherent)?;
			}

644
			let mut unqueue_invalid = Vec::new();
645
646
647
			let mut pending_size = 0;

			let ready_iter = self.transaction_pool.ready();
648
			for ready in ready_iter.take(MAX_TRANSACTIONS) {
649
650
651
652
653
654
655
656
657
658
659
660
				let encoded_size = ready.data.encode().len();
				if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE {
					break
				}

				match block_builder.push(ready.data.clone()) {
					Ok(()) => {
						pending_size += encoded_size;
					}
					Err(e) => {
						trace!(target: "transaction-pool", "Invalid transaction: {}", e);
						unqueue_invalid.push(ready.hash.clone());
661
662
					}
				}
663
			}
664

665
			self.transaction_pool.remove_invalid(&unqueue_invalid);
666
667
		}

668
		let new_block = block_builder.bake()?;
669

670
		info!("Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]",
671
672
673
674
			new_block.header.number,
			Hash::from(new_block.header.hash()),
			new_block.header.parent_hash,
			new_block.extrinsics.iter()
Gav Wood's avatar
Gav Wood committed
675
				.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
676
677
678
679
680
				.collect::<Vec<_>>()
				.join(", ")
		);

		// TODO: full re-evaluation
681
		let active_parachains = runtime_api.active_parachains(&self.parent_id)?;
682
		assert!(evaluation::evaluate_initial(
683
			&new_block,
Gav Wood's avatar
Gav Wood committed
684
			self.believed_minimum_timestamp,
685
686
687
688
			&self.parent_hash,
			self.parent_number,
			&active_parachains,
		).is_ok());
689

690
		Ok(new_block)
691
	}
692
693
}

694
695
696
impl<C, TxApi> Future for CreateProposal<C, TxApi> where
	TxApi: PoolChainApi<Block=Block>,
	C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
697
	C::Api: ParachainHost<Block> + BlockBuilderApi<Block, InherentData>,
698
{
Gav Wood's avatar
Gav Wood committed
699
	type Item = Block;
700
701
	type Error = Error;

Gav Wood's avatar
Gav Wood committed
702
	fn poll(&mut self) -> Poll<Block, Error> {
703
		// 1. try to propose if we have enough includable candidates and other
704
705
706
707
		// delays have concluded.
		let included = self.table.includable_count();
		try_ready!(self.timing.poll(included));

708
		// 2. propose
709
		let proposed_candidates = self.table.proposed_set();
710
711
712

		self.propose_with(proposed_candidates).map(Async::Ready)
	}
713
}
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731

#[cfg(test)]
mod tests {
	use super::*;
	use substrate_keyring::Keyring;

	#[test]
	fn sign_and_check_statement() {
		let statement: Statement = GenericStatement::Valid([1; 32].into());
		let parent_hash = [2; 32].into();

		let sig = sign_table_statement(&statement, &Keyring::Alice.pair(), &parent_hash);

		assert!(check_statement(&statement, &sig, Keyring::Alice.to_raw_public().into(), &parent_hash));
		assert!(!check_statement(&statement, &sig, Keyring::Alice.to_raw_public().into(), &[0xff; 32].into()));
		assert!(!check_statement(&statement, &sig, Keyring::Bob.to_raw_public().into(), &parent_hash));
	}
}