lib.rs 58.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements a `CandidateBackingSubsystem`.

19
20
#![deny(unused_crate_dependencies)]

21
22
23
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::pin::Pin;
asynchronous rob's avatar
asynchronous rob committed
24
use std::sync::Arc;
25
26
27

use bitvec::vec::BitVec;
use futures::{
28
29
	channel::{mpsc, oneshot},
	Future, FutureExt, SinkExt, StreamExt,
30
31
};

32
use sp_keystore::SyncCryptoStorePtr;
asynchronous rob's avatar
asynchronous rob committed
33
use polkadot_primitives::v1::{
34
	CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
35
	ValidatorIndex, SigningContext, PoV,
36
	CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
37
	CandidateCommitments, CoreState, CoreIndex, CollatorId, ValidationOutputs,
38
39
};
use polkadot_node_primitives::{
40
	FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
41
42
};
use polkadot_subsystem::{
43
44
45
46
	messages::{
		AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
		CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
		ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
47
		RuntimeApiRequest,
48
	},
49
50
51
52
53
54
55
56
};
use polkadot_node_subsystem_util::{
	self as util,
	request_session_index_for_child,
	request_validator_groups,
	request_validators,
	request_from_runtime,
	Validator,
57
	delegated_subsystem,
58
	metrics::{self, prometheus},
59
60
61
};
use statement_table::{
	generic::AttestedCandidate as TableAttestedCandidate,
asynchronous rob's avatar
asynchronous rob committed
62
63
64
65
66
67
	Context as TableContextTrait,
	Table,
	v1::{
		Statement as TableStatement,
		SignedStatement as TableSignedStatement, Summary as TableSummary,
	},
68
};
69
use thiserror::Error;
70

71
#[derive(Debug, Error)]
72
enum Error {
73
	#[error("Candidate is not found")]
74
	CandidateNotFound,
75
	#[error("Signature is invalid")]
76
	InvalidSignature,
77
78
79
80
81
82
83
84
85
86
87
88
	#[error("Failed to send candidates {0:?}")]
	Send(Vec<NewBackedCandidate>),
	#[error("Oneshot never resolved")]
	Oneshot(#[from] #[source] oneshot::Canceled),
	#[error("Obtaining erasure chunks failed")]
	ObtainErasureChunks(#[from] #[source] erasure_coding::Error),
	#[error(transparent)]
	ValidationFailed(#[from] ValidationFailed),
	#[error(transparent)]
	Mpsc(#[from] mpsc::SendError),
	#[error(transparent)]
	UtilError(#[from] util::Error),
89
90
91
92
93
94
95
96
97
98
}

/// Holds all data needed for candidate backing job operation.
struct CandidateBackingJob {
	/// The hash of the relay parent on top of which this job is doing it's work.
	parent: Hash,
	/// Inbound message channel receiving part.
	rx_to: mpsc::Receiver<ToJob>,
	/// Outbound message channel sending part.
	tx_from: mpsc::Sender<FromJob>,
99
	/// The `ParaId` assigned to this validator
100
	assignment: ParaId,
101
102
	/// The collator required to author the candidate, if any.
	required_collator: Option<CollatorId>,
103
104
105
106
	/// We issued `Valid` or `Invalid` statements on about these candidates.
	issued_statements: HashSet<Hash>,
	/// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
	seconded: Option<Hash>,
asynchronous rob's avatar
asynchronous rob committed
107
108
109
	/// The candidates that are includable, by hash. Each entry here indicates
	/// that we've sent the provisioner the backed candidate.
	backed: HashSet<Hash>,
110
111
	/// We have already reported misbehaviors for these validators.
	reported_misbehavior_for: HashSet<ValidatorIndex>,
112
	keystore: SyncCryptoStorePtr,
113
114
	table: Table<TableContext>,
	table_context: TableContext,
115
	metrics: Metrics,
116
117
118
119
120
121
122
123
124
}

const fn group_quorum(n_validators: usize) -> usize {
	(n_validators / 2) + 1
}

#[derive(Default)]
struct TableContext {
	signing_context: SigningContext,
125
	validator: Option<Validator>,
126
127
128
129
130
	groups: HashMap<ParaId, Vec<ValidatorIndex>>,
	validators: Vec<ValidatorId>,
}

impl TableContextTrait for TableContext {
asynchronous rob's avatar
asynchronous rob committed
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
	type AuthorityId = ValidatorIndex;
	type Digest = Hash;
	type GroupId = ParaId;
	type Signature = ValidatorSignature;
	type Candidate = CommittedCandidateReceipt;

	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> Hash {
		candidate.hash()
	}

	fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
		candidate.descriptor().para_id
	}

	fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
		self.groups.get(group).map_or(false, |g| g.iter().position(|a| a == authority).is_some())
147
148
149
150
151
152
153
	}

	fn requisite_votes(&self, group: &ParaId) -> usize {
		self.groups.get(group).map_or(usize::max_value(), |g| group_quorum(g.len()))
	}
}

154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
pub enum ToJob {
	/// A `CandidateBackingMessage`.
	CandidateBacking(CandidateBackingMessage),
	/// Stop working.
	Stop,
}

impl TryFrom<AllMessages> for ToJob {
	type Error = ();

	fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
		match msg {
			AllMessages::CandidateBacking(msg) => Ok(ToJob::CandidateBacking(msg)),
			_ => Err(()),
		}
170
	}
171
}
172

173
174
175
impl From<CandidateBackingMessage> for ToJob {
	fn from(msg: CandidateBackingMessage) -> Self {
		Self::CandidateBacking(msg)
176
177
178
	}
}

179
180
impl util::ToJobTrait for ToJob {
	const STOP: Self = ToJob::Stop;
181

182
183
184
185
186
187
	fn relay_parent(&self) -> Option<Hash> {
		match self {
			Self::CandidateBacking(cb) => cb.relay_parent(),
			Self::Stop => None,
		}
	}
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
}

/// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`.
enum FromJob {
	AvailabilityStore(AvailabilityStoreMessage),
	RuntimeApiMessage(RuntimeApiMessage),
	CandidateValidation(CandidateValidationMessage),
	CandidateSelection(CandidateSelectionMessage),
	Provisioner(ProvisionerMessage),
	PoVDistribution(PoVDistributionMessage),
	StatementDistribution(StatementDistributionMessage),
}

impl From<FromJob> for AllMessages {
	fn from(f: FromJob) -> Self {
		match f {
			FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg),
			FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg),
			FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg),
			FromJob::CandidateSelection(msg) => AllMessages::CandidateSelection(msg),
			FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg),
			FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg),
			FromJob::Provisioner(msg) => AllMessages::Provisioner(msg),
		}
	}
}

215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
impl TryFrom<AllMessages> for FromJob {
	type Error = &'static str;

	fn try_from(f: AllMessages) -> Result<Self, Self::Error> {
		match f {
			AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)),
			AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)),
			AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)),
			AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)),
			AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)),
			AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)),
			AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)),
			_ => Err("can't convert this AllMessages variant to FromJob"),
		}
	}
}

232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
	let statement = match s.payload() {
		Statement::Seconded(c) => TableStatement::Candidate(c.clone()),
		Statement::Valid(h) => TableStatement::Valid(h.clone()),
		Statement::Invalid(h) => TableStatement::Invalid(h.clone()),
	};

	TableSignedStatement {
		statement,
		signature: s.signature().clone(),
		sender: s.validator_index(),
	}
}

asynchronous rob's avatar
asynchronous rob committed
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
fn table_attested_to_backed(
	attested: TableAttestedCandidate<
		ParaId,
		CommittedCandidateReceipt,
		ValidatorIndex,
		ValidatorSignature,
	>,
	table_context: &TableContext,
) -> Option<BackedCandidate> {
	let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested;

	let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes
		.into_iter()
		.map(|(id, vote)| (id, vote.into()))
		.unzip();

	let group = table_context.groups.get(&para_id)?;

	let mut validator_indices = BitVec::with_capacity(group.len());

	validator_indices.resize(group.len(), false);

	for id in ids.iter() {
		if let Some(position) = group.iter().position(|x| x == id) {
			validator_indices.set(position, true);
		}
	}

	Some(BackedCandidate {
		candidate,
		validity_votes,
		validator_indices,
	})
}

283
284
impl CandidateBackingJob {
	/// Run asynchronously.
285
	async fn run_loop(mut self) -> Result<(), Error> {
286
287
288
289
290
		while let Some(msg) = self.rx_to.next().await {
			match msg {
				ToJob::CandidateBacking(msg) => {
					self.process_msg(msg).await?;
				}
291
				ToJob::Stop => break,
292
293
294
295
296
297
298
299
			}
		}

		Ok(())
	}

	async fn issue_candidate_invalid_message(
		&mut self,
asynchronous rob's avatar
asynchronous rob committed
300
		candidate: CandidateReceipt,
301
302
303
304
305
306
307
308
309
	) -> Result<(), Error> {
		self.tx_from.send(FromJob::CandidateSelection(
			CandidateSelectionMessage::Invalid(self.parent, candidate)
		)).await?;

		Ok(())
	}

	/// Validate the candidate that is requested to be `Second`ed and distribute validation result.
asynchronous rob's avatar
asynchronous rob committed
310
311
	///
	/// Returns `Ok(true)` if we issued a `Seconded` statement about this candidate.
312
313
	async fn validate_and_second(
		&mut self,
asynchronous rob's avatar
asynchronous rob committed
314
315
316
		candidate: &CandidateReceipt,
		pov: PoV,
	) -> Result<bool, Error> {
317
318
319
320
321
322
323
324
		// Check that candidate is collated by the right collator.
		if self.required_collator.as_ref()
			.map_or(false, |c| c != &candidate.descriptor().collator)
		{
			self.issue_candidate_invalid_message(candidate.clone()).await?;
			return Ok(false);
		}

asynchronous rob's avatar
asynchronous rob committed
325
326
327
328
329
330
331
332
		let valid = self.request_candidate_validation(
			candidate.descriptor().clone(),
			Arc::new(pov.clone()),
		).await?;

		let candidate_hash = candidate.hash();

		let statement = match valid {
333
			ValidationResult::Valid(outputs, validation_data) => {
334
335
336
				// make PoV available for later distribution. Send data to the availability
				// store to keep. Sign and dispatch `valid` statement to network if we
				// have not seconded the given candidate.
asynchronous rob's avatar
asynchronous rob committed
337
338
339
340
341
				//
				// If the commitments hash produced by validation is not the same as given by
				// the collator, do not make available and report the collator.
				let commitments_check = self.make_pov_available(
					pov,
342
					validation_data,
asynchronous rob's avatar
asynchronous rob committed
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
					outputs,
					|commitments| if commitments.hash() == candidate.commitments_hash {
						Ok(CommittedCandidateReceipt {
							descriptor: candidate.descriptor().clone(),
							commitments,
						})
					} else {
						Err(())
					},
				).await?;

				match commitments_check {
					Ok(candidate) => {
						self.issued_statements.insert(candidate_hash);
						Some(Statement::Seconded(candidate))
					}
					Err(()) => {
						self.issue_candidate_invalid_message(candidate.clone()).await?;
						None
					}
				}
364
			}
365
			ValidationResult::Invalid(_reason) => {
asynchronous rob's avatar
asynchronous rob committed
366
367
368
369
370
371
				// no need to issue a statement about this if we aren't seconding it.
				//
				// there's an infinite amount of garbage out there. no need to acknowledge
				// all of it.
				self.issue_candidate_invalid_message(candidate.clone()).await?;
				None
372
373
374
			}
		};

asynchronous rob's avatar
asynchronous rob committed
375
		let issued_statement = statement.is_some();
376
377

		if let Some(statement) = statement {
asynchronous rob's avatar
asynchronous rob committed
378
			self.sign_import_and_distribute_statement(statement).await?
379
380
		}

asynchronous rob's avatar
asynchronous rob committed
381
		Ok(issued_statement)
382
383
	}

asynchronous rob's avatar
asynchronous rob committed
384
385
386
387
388
389
390
391
392
	async fn sign_import_and_distribute_statement(&mut self, statement: Statement) -> Result<(), Error> {
		if let Some(signed_statement) = self.sign_statement(statement).await {
			self.import_statement(&signed_statement).await?;
			self.distribute_signed_statement(signed_statement).await?;
		}

		Ok(())
	}

393
394
395
396
397
	fn get_backed(&self) -> Vec<NewBackedCandidate> {
		let proposed = self.table.proposed_candidates(&self.table_context);
		let mut res = Vec::with_capacity(proposed.len());

		for p in proposed.into_iter() {
asynchronous rob's avatar
asynchronous rob committed
398
			match table_attested_to_backed(p, &self.table_context) {
399
				None => continue,
asynchronous rob's avatar
asynchronous rob committed
400
				Some(backed) => res.push(NewBackedCandidate(backed)),
401
402
403
404
405
406
407
			}
		}

		res
	}

	/// Check if there have happened any new misbehaviors and issue necessary messages.
asynchronous rob's avatar
asynchronous rob committed
408
	///
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
	/// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387)
	async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
		let mut reports = Vec::new();

		for (k, v) in self.table.get_misbehavior().iter() {
			if !self.reported_misbehavior_for.contains(k) {
				self.reported_misbehavior_for.insert(*k);

				let f = FromTableMisbehavior {
					id: *k,
					report: v.clone(),
					signing_context: self.table_context.signing_context.clone(),
					key: self.table_context.validators[*k as usize].clone(),
				};

				if let Ok(report) = MisbehaviorReport::try_from(f) {
					let message = ProvisionerMessage::ProvisionableData(
426
						ProvisionableData::MisbehaviorReport(self.parent, report),
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
					);

					reports.push(message);
				}
			}
		}

		for report in reports.drain(..) {
			self.send_to_provisioner(report).await?
		}

		Ok(())
	}

	/// Import a statement into the statement table and return the summary of the import.
	async fn import_statement(
		&mut self,
		statement: &SignedFullStatement,
	) -> Result<Option<TableSummary>, Error> {
		let stmt = primitive_statement_to_table(statement);

		let summary = self.table.import_statement(&self.table_context, stmt);

asynchronous rob's avatar
asynchronous rob committed
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
		if let Some(ref summary) = summary {
			if let Some(attested) = self.table.attested_candidate(
				&summary.candidate,
				&self.table_context,
			) {
				// `HashSet::insert` returns true if the thing wasn't in there already.
				// one of the few places the Rust-std folks did a bad job with API
				if self.backed.insert(summary.candidate) {
					if let Some(backed) =
						table_attested_to_backed(attested, &self.table_context)
					{
						let message = ProvisionerMessage::ProvisionableData(
							ProvisionableData::BackedCandidate(backed),
						);
						self.send_to_provisioner(message).await?;
					}
				}
			}
		}

470
471
		self.issue_new_misbehaviors().await?;

asynchronous rob's avatar
asynchronous rob committed
472
		Ok(summary)
473
474
475
476
477
478
	}

	async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
		match msg {
			CandidateBackingMessage::Second(_, candidate, pov) => {
				// Sanity check that candidate is from our assignment.
asynchronous rob's avatar
asynchronous rob committed
479
				if candidate.descriptor().para_id != self.assignment {
480
481
482
483
484
485
					return Ok(());
				}

				// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
				// Seconded statement only if we have not seconded any other candidate and
				// have not signed a Valid statement for the requested candidate.
asynchronous rob's avatar
asynchronous rob committed
486
				if self.seconded.is_none() {
487
					// This job has not seconded a candidate yet.
asynchronous rob's avatar
asynchronous rob committed
488
489
490
491
492
493
494
495
496
					let candidate_hash = candidate.hash();

					if !self.issued_statements.contains(&candidate_hash) {
						if let Ok(true) = self.validate_and_second(
							&candidate,
							pov,
						).await {
							self.metrics.on_candidate_seconded();
							self.seconded = Some(candidate_hash);
497
498
499
500
501
502
						}
					}
				}
			}
			CandidateBackingMessage::Statement(_, statement) => {
				self.check_statement_signature(&statement)?;
503
504
505
506
507
				match self.maybe_validate_and_import(statement).await {
					Err(Error::ValidationFailed(_)) => return Ok(()),
					Err(e) => return Err(e),
					Ok(()) => (),
				}
508
509
510
511
			}
			CandidateBackingMessage::GetBackedCandidates(_, tx) => {
				let backed = self.get_backed();

512
				tx.send(backed).map_err(|data| Error::Send(data))?;
513
514
515
516
517
518
519
520
521
522
			}
		}

		Ok(())
	}

	/// Kick off validation work and distribute the result as a signed statement.
	async fn kick_off_validation_work(
		&mut self,
		summary: TableSummary,
asynchronous rob's avatar
asynchronous rob committed
523
524
525
526
527
528
529
530
531
532
533
534
535
536
	) -> Result<(), Error> {
		let candidate_hash = summary.candidate.clone();

		if self.issued_statements.contains(&candidate_hash) {
			return Ok(())
		}

		// We clone the commitments here because there are borrowck
		// errors relating to this being a struct and methods borrowing the entirety of self
		// and not just those things that the function uses.
		let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?;
		let expected_commitments = candidate.commitments.clone();

		let descriptor = candidate.descriptor().clone();
537
538
539
540
541
542
543
544
545
546
547
548
549

		// Check that candidate is collated by the right collator.
		if self.required_collator.as_ref()
			.map_or(false, |c| c != &descriptor.collator)
		{
			// If not, we've got the statement in the table but we will
			// not issue validation work for it.
			//
			// Act as though we've issued a statement.
			self.issued_statements.insert(candidate_hash);
			return Ok(());
		}

asynchronous rob's avatar
asynchronous rob committed
550
551
552
553
		let pov = self.request_pov_from_distribution(descriptor.clone()).await?;
		let v = self.request_candidate_validation(descriptor, pov.clone()).await?;

		let statement = match v {
554
			ValidationResult::Valid(outputs, validation_data) => {
asynchronous rob's avatar
asynchronous rob committed
555
556
557
				// If validation produces a new set of commitments, we vote the candidate as invalid.
				let commitments_check = self.make_pov_available(
					(&*pov).clone(),
558
					validation_data,
asynchronous rob's avatar
asynchronous rob committed
559
560
561
562
563
564
565
					outputs,
					|commitments| if commitments == expected_commitments {
						Ok(())
					} else {
						Err(())
					}
				).await?;
566

asynchronous rob's avatar
asynchronous rob committed
567
568
569
570
				match commitments_check {
					Ok(()) => Statement::Valid(candidate_hash),
					Err(()) => Statement::Invalid(candidate_hash),
				}
571
			}
572
			ValidationResult::Invalid(_reason) => {
573
574
575
576
577
578
				Statement::Invalid(candidate_hash)
			}
		};

		self.issued_statements.insert(candidate_hash);

asynchronous rob's avatar
asynchronous rob committed
579
		self.sign_import_and_distribute_statement(statement).await
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
	}

	/// Import the statement and kick off validation work if it is a part of our assignment.
	async fn maybe_validate_and_import(
		&mut self,
		statement: SignedFullStatement,
	) -> Result<(), Error> {
		if let Some(summary) = self.import_statement(&statement).await? {
			if let Statement::Seconded(_) = statement.payload() {
				if summary.group_id == self.assignment {
					self.kick_off_validation_work(summary).await?;
				}
			}
		}

		Ok(())
	}

598
599
600
601
602
603
604
	async fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
		let signed = self.table_context
			.validator
			.as_ref()?
			.sign(self.keystore.clone(), statement)
			.await
			.ok()?;
605
606
		self.metrics.on_statement_signed();
		Some(signed)
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
	}

	fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
		let idx = statement.validator_index() as usize;

		if self.table_context.validators.len() > idx {
			statement.check_signature(
				&self.table_context.signing_context,
				&self.table_context.validators[idx],
			).map_err(|_| Error::InvalidSignature)?;
		} else {
			return Err(Error::InvalidSignature);
		}

		Ok(())
	}

	async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
		self.tx_from.send(FromJob::Provisioner(msg)).await?;

		Ok(())
	}

	async fn request_pov_from_distribution(
		&mut self,
		descriptor: CandidateDescriptor,
asynchronous rob's avatar
asynchronous rob committed
633
	) -> Result<Arc<PoV>, Error> {
634
635
636
637
638
639
		let (tx, rx) = oneshot::channel();

		self.tx_from.send(FromJob::PoVDistribution(
			PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
		)).await?;

asynchronous rob's avatar
asynchronous rob committed
640
		Ok(rx.await?)
641
642
643
644
	}

	async fn request_candidate_validation(
		&mut self,
asynchronous rob's avatar
asynchronous rob committed
645
646
647
		candidate: CandidateDescriptor,
		pov: Arc<PoV>,
	) -> Result<ValidationResult, Error> {
648
649
650
		let (tx, rx) = oneshot::channel();

		self.tx_from.send(FromJob::CandidateValidation(
asynchronous rob's avatar
asynchronous rob committed
651
				CandidateValidationMessage::ValidateFromChainState(
652
653
654
655
656
657
658
659
660
661
					candidate,
					pov,
					tx,
				)
			)
		).await?;

		Ok(rx.await??)
	}

662
	async fn store_available_data(
663
		&mut self,
664
665
666
		id: Option<ValidatorIndex>,
		n_validators: u32,
		available_data: AvailableData,
667
	) -> Result<(), Error> {
668
		let (tx, rx) = oneshot::channel();
669
		self.tx_from.send(FromJob::AvailabilityStore(
670
671
672
673
674
675
676
				AvailabilityStoreMessage::StoreAvailableData(
					self.parent,
					id,
					n_validators,
					available_data,
					tx,
				)
677
678
679
			)
		).await?;

680
		let _ = rx.await?;
681

682
683
684
		Ok(())
	}

685
	// Make a `PoV` available.
asynchronous rob's avatar
asynchronous rob committed
686
687
688
689
690
	//
	// This calls an inspection function before making the PoV available for any last checks
	// that need to be done. If the inspection function returns an error, this function returns
	// early without making the PoV available.
	async fn make_pov_available<T, E>(
691
		&mut self,
asynchronous rob's avatar
asynchronous rob committed
692
		pov: PoV,
693
		validation_data: polkadot_primitives::v1::PersistedValidationData,
asynchronous rob's avatar
asynchronous rob committed
694
695
696
		outputs: ValidationOutputs,
		with_commitments: impl FnOnce(CandidateCommitments) -> Result<T, E>,
	) -> Result<Result<T, E>, Error> {
697
		let available_data = AvailableData {
asynchronous rob's avatar
asynchronous rob committed
698
			pov,
699
			validation_data,
700
701
		};

asynchronous rob's avatar
asynchronous rob committed
702
		let chunks = erasure_coding::obtain_chunks_v1(
703
704
705
706
707
			self.table_context.validators.len(),
			&available_data,
		)?;

		let branches = erasure_coding::branches(chunks.as_ref());
asynchronous rob's avatar
asynchronous rob committed
708
709
710
711
712
713
714
		let erasure_root = branches.root();

		let commitments = CandidateCommitments {
			upward_messages: outputs.upward_messages,
			erasure_root,
			new_validation_code: outputs.new_validation_code,
			head_data: outputs.head_data,
715
			processed_downward_messages: outputs.processed_downward_messages,
asynchronous rob's avatar
asynchronous rob committed
716
		};
717

asynchronous rob's avatar
asynchronous rob committed
718
719
720
721
722
		let res = match with_commitments(commitments) {
			Ok(x) => x,
			Err(e) => return Ok(Err(e)),
		};

723
724
725
726
727
		self.store_available_data(
			self.table_context.validator.as_ref().map(|v| v.index()),
			self.table_context.validators.len() as u32,
			available_data,
		).await?;
728

asynchronous rob's avatar
asynchronous rob committed
729
		Ok(Ok(res))
730
731
732
733
734
735
736
737
738
739
740
	}

	async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
		let smsg = StatementDistributionMessage::Share(self.parent, s);

		self.tx_from.send(FromJob::StatementDistribution(smsg)).await?;

		Ok(())
	}
}

741
742
743
744
impl util::JobTrait for CandidateBackingJob {
	type ToJob = ToJob;
	type FromJob = FromJob;
	type Error = Error;
745
	type RunArgs = SyncCryptoStorePtr;
746
	type Metrics = Metrics;
747

748
	const NAME: &'static str = "CandidateBackingJob";
749

750
751
	fn run(
		parent: Hash,
752
		keystore: SyncCryptoStorePtr,
753
		metrics: Metrics,
754
755
756
757
		rx_to: mpsc::Receiver<Self::ToJob>,
		mut tx_from: mpsc::Sender<Self::FromJob>,
	) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
		async move {
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
			macro_rules! try_runtime_api {
				($x: expr) => {
					match $x {
						Ok(x) => x,
						Err(e) => {
							log::warn!(
								target: "candidate_backing",
								"Failed to fetch runtime API data for job: {:?}",
								e,
							);

							// We can't do candidate validation work if we don't have the
							// requisite runtime API data. But these errors should not take
							// down the node.
							return Ok(());
						}
					}
				}
			}

			let (validators, groups, session_index, cores) = futures::try_join!(
779
780
				request_validators(parent, &mut tx_from).await?,
				request_validator_groups(parent, &mut tx_from).await?,
781
782
783
784
785
786
				request_session_index_for_child(parent, &mut tx_from).await?,
				request_from_runtime(
					parent,
					&mut tx_from,
					|tx| RuntimeApiRequest::AvailabilityCores(tx),
				).await?,
787
788
			)?;

789
790
791
792
793
794
			let validators = try_runtime_api!(validators);
			let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
			let session_index = try_runtime_api!(session_index);
			let cores = try_runtime_api!(cores);

			let signing_context = SigningContext { parent_hash: parent, session_index };
795
796
797
798
			let validator = match Validator::construct(
				&validators,
				signing_context,
				keystore.clone(),
799
			).await {
800
801
802
803
804
805
806
807
808
809
810
811
				Ok(v) => v,
				Err(util::Error::NotAValidator) => { return Ok(()) },
				Err(e) => {
					log::warn!(
						target: "candidate_backing",
						"Cannot participate in candidate backing: {:?}",
						e
					);

					return Ok(())
				}
			};
812
813
814

			let mut groups = HashMap::new();

815
			let n_cores = cores.len();
816
817

			let mut assignment = None;
818
819
820
821
822
823
			for (idx, core) in cores.into_iter().enumerate() {
				// Ignore prospective assignments on occupied cores for the time being.
				if let CoreState::Scheduled(scheduled) = core {
					let core_index = CoreIndex(idx as _);
					let group_index = group_rotation_info.group_for_core(core_index, n_cores);
					if let Some(g) = validator_groups.get(group_index.0 as usize) {
824
825
826
						if g.contains(&validator.index()) {
							assignment = Some((scheduled.para_id, scheduled.collator));
						}
827
828
						groups.insert(scheduled.para_id, g.clone());
					}
829
				}
830
831
			}

832
833
834
835
836
837
			let table_context = TableContext {
				groups,
				validators,
				signing_context: validator.signing_context().clone(),
				validator: Some(validator),
			};
838

839
840
			let (assignment, required_collator) = match assignment {
				None => return Ok(()), // no need to work.
841
				Some(r) => r,
842
843
			};

844
845
846
847
848
			let job = CandidateBackingJob {
				parent,
				rx_to,
				tx_from,
				assignment,
849
				required_collator,
850
851
				issued_statements: HashSet::new(),
				seconded: None,
asynchronous rob's avatar
asynchronous rob committed
852
				backed: HashSet::new(),
853
				reported_misbehavior_for: HashSet::new(),
854
				keystore,
855
856
				table: Table::default(),
				table_context,
857
				metrics,
858
			};
859

860
			job.run_loop().await
861
		}
862
		.boxed()
863
864
865
	}
}

866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
#[derive(Clone)]
struct MetricsInner {
	signed_statements_total: prometheus::Counter<prometheus::U64>,
	candidates_seconded_total: prometheus::Counter<prometheus::U64>
}

/// Candidate backing metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_statement_signed(&self) {
		if let Some(metrics) = &self.0 {
			metrics.signed_statements_total.inc();
		}
	}

	fn on_candidate_seconded(&self) {
		if let Some(metrics) = &self.0 {
			metrics.candidates_seconded_total.inc();
		}
	}
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			signed_statements_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_signed_statements_total",
					"Number of statements signed.",
				)?,
				registry,
			)?,
			candidates_seconded_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_candidates_seconded_total",
					"Number of candidates seconded.",
				)?,
				registry,
			)?,
		};
		Ok(Metrics(Some(metrics)))
	}
}

912
delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- ToJob as CandidateBackingSubsystem);
913

914
915
916
#[cfg(test)]
mod tests {
	use super::*;
917
	use assert_matches::assert_matches;
918
	use futures::{future, Future};
asynchronous rob's avatar
asynchronous rob committed
919
	use polkadot_primitives::v1::{
920
		ScheduledCore, BlockData, CandidateCommitments,
921
		PersistedValidationData, ValidationData, TransientValidationData, HeadData,
922
		ValidityAttestation, GroupRotationInfo,
923
	};
924
	use polkadot_subsystem::{
925
		messages::RuntimeApiRequest,
926
		ActiveLeavesUpdate, FromOverseer, OverseerSignal,
927
	};
928
	use polkadot_node_primitives::InvalidCandidate;
929
	use sp_keyring::Sr25519Keyring;
930
931
	use sp_application_crypto::AppKey;
	use sp_keystore::{CryptoStore, SyncCryptoStore};
932
	use std::collections::HashMap;
933
934
935
936
937
938
939

	fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
		val_ids.iter().map(|v| v.public().into()).collect()
	}

	struct TestState {
		chain_ids: Vec<ParaId>,
940
		keystore: SyncCryptoStorePtr,
941
942
		validators: Vec<Sr25519Keyring>,
		validator_public: Vec<ValidatorId>,
943
		validation_data: ValidationData,
944
945
		validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
		availability_cores: Vec<CoreState>,
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
		head_data: HashMap<ParaId, HeadData>,
		signing_context: SigningContext,
		relay_parent: Hash,
	}

	impl Default for TestState {
		fn default() -> Self {
			let chain_a = ParaId::from(1);
			let chain_b = ParaId::from(2);
			let thread_a = ParaId::from(3);

			let chain_ids = vec![chain_a, chain_b, thread_a];

			let validators = vec![
				Sr25519Keyring::Alice,
				Sr25519Keyring::Bob,
				Sr25519Keyring::Charlie,
				Sr25519Keyring::Dave,
				Sr25519Keyring::Ferdie,
asynchronous rob's avatar
asynchronous rob committed
965
				Sr25519Keyring::One,
966
967
			];

968
			let keystore = Arc::new(sc_keystore::LocalKeystore::in_memory());
969
			// Make sure `Alice` key is in the keystore, so this mocked node will be a parachain validator.
970
			SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, Some(&validators[0].to_seed()))
971
972
973
974
				.expect("Insert key into keystore");

			let validator_public = validator_pubkeys(&validators);

asynchronous rob's avatar
asynchronous rob committed
975
			let validator_groups = vec![vec![2, 0, 3, 5], vec![1], vec![4]];
976
977
978
979
			let group_rotation_info = GroupRotationInfo {
				session_start_block: 0,
				group_rotation_frequency: 100,
				now: 1,
980
981
982
			};

			let thread_collator: CollatorId = Sr25519Keyring::Two.public().into();
983
984
985
986
987
988
989
990
991
992
993
994
995
996
			let availability_cores = vec![
				CoreState::Scheduled(ScheduledCore {
					para_id: chain_a,
					collator: None,
				}),
				CoreState::Scheduled(ScheduledCore {
					para_id: chain_b,
					collator: None,
				}),
				CoreState::Scheduled(ScheduledCore {
					para_id: thread_a,
					collator: Some(thread_collator.clone()),
				}),
			];
997

998
999
			let mut head_data = HashMap::new();
			head_data.insert(chain_a, HeadData(vec![4, 5, 6]));
1000

For faster browsing, not all history is shown. View entire blame