lib.rs 56.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements a `CandidateBackingSubsystem`.

19
20
#![deny(unused_crate_dependencies)]

21
22
23
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::pin::Pin;
asynchronous rob's avatar
asynchronous rob committed
24
use std::sync::Arc;
25
26
27

use bitvec::vec::BitVec;
use futures::{
28
29
	channel::{mpsc, oneshot},
	Future, FutureExt, SinkExt, StreamExt,
30
31
};

32
use sp_keystore::SyncCryptoStorePtr;
asynchronous rob's avatar
asynchronous rob committed
33
use polkadot_primitives::v1::{
34
	CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
35
	ValidatorIndex, SigningContext, PoV,
36
	CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
37
	CandidateCommitments, CoreState, CoreIndex, CollatorId, ValidationOutputs,
38
39
};
use polkadot_node_primitives::{
40
	FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
41
42
};
use polkadot_subsystem::{
43
44
45
46
	messages::{
		AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
		CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
		ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
47
		RuntimeApiRequest,
48
	},
49
50
51
52
53
54
55
56
};
use polkadot_node_subsystem_util::{
	self as util,
	request_session_index_for_child,
	request_validator_groups,
	request_validators,
	request_from_runtime,
	Validator,
57
	delegated_subsystem,
58
	metrics::{self, prometheus},
59
60
61
};
use statement_table::{
	generic::AttestedCandidate as TableAttestedCandidate,
asynchronous rob's avatar
asynchronous rob committed
62
63
64
65
66
67
	Context as TableContextTrait,
	Table,
	v1::{
		Statement as TableStatement,
		SignedStatement as TableSignedStatement, Summary as TableSummary,
	},
68
};
69
use thiserror::Error;
70

71
#[derive(Debug, Error)]
72
enum Error {
73
	#[error("Candidate is not found")]
74
	CandidateNotFound,
75
	#[error("Signature is invalid")]
76
	InvalidSignature,
77
78
79
80
81
82
83
84
85
86
87
88
	#[error("Failed to send candidates {0:?}")]
	Send(Vec<NewBackedCandidate>),
	#[error("Oneshot never resolved")]
	Oneshot(#[from] #[source] oneshot::Canceled),
	#[error("Obtaining erasure chunks failed")]
	ObtainErasureChunks(#[from] #[source] erasure_coding::Error),
	#[error(transparent)]
	ValidationFailed(#[from] ValidationFailed),
	#[error(transparent)]
	Mpsc(#[from] mpsc::SendError),
	#[error(transparent)]
	UtilError(#[from] util::Error),
89
90
91
92
93
94
95
96
97
98
}

/// Holds all data needed for candidate backing job operation.
struct CandidateBackingJob {
	/// The hash of the relay parent on top of which this job is doing it's work.
	parent: Hash,
	/// Inbound message channel receiving part.
	rx_to: mpsc::Receiver<ToJob>,
	/// Outbound message channel sending part.
	tx_from: mpsc::Sender<FromJob>,
99
	/// The `ParaId` assigned to this validator
100
	assignment: ParaId,
101
102
	/// The collator required to author the candidate, if any.
	required_collator: Option<CollatorId>,
103
104
105
106
107
108
	/// We issued `Valid` or `Invalid` statements on about these candidates.
	issued_statements: HashSet<Hash>,
	/// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
	seconded: Option<Hash>,
	/// We have already reported misbehaviors for these validators.
	reported_misbehavior_for: HashSet<ValidatorIndex>,
109
	keystore: SyncCryptoStorePtr,
110
111
	table: Table<TableContext>,
	table_context: TableContext,
112
	metrics: Metrics,
113
114
115
116
117
118
119
120
121
}

const fn group_quorum(n_validators: usize) -> usize {
	(n_validators / 2) + 1
}

#[derive(Default)]
struct TableContext {
	signing_context: SigningContext,
122
	validator: Option<Validator>,
123
124
125
126
127
	groups: HashMap<ParaId, Vec<ValidatorIndex>>,
	validators: Vec<ValidatorId>,
}

impl TableContextTrait for TableContext {
asynchronous rob's avatar
asynchronous rob committed
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
	type AuthorityId = ValidatorIndex;
	type Digest = Hash;
	type GroupId = ParaId;
	type Signature = ValidatorSignature;
	type Candidate = CommittedCandidateReceipt;

	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> Hash {
		candidate.hash()
	}

	fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
		candidate.descriptor().para_id
	}

	fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
		self.groups.get(group).map_or(false, |g| g.iter().position(|a| a == authority).is_some())
144
145
146
147
148
149
150
	}

	fn requisite_votes(&self, group: &ParaId) -> usize {
		self.groups.get(group).map_or(usize::max_value(), |g| group_quorum(g.len()))
	}
}

151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
pub enum ToJob {
	/// A `CandidateBackingMessage`.
	CandidateBacking(CandidateBackingMessage),
	/// Stop working.
	Stop,
}

impl TryFrom<AllMessages> for ToJob {
	type Error = ();

	fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
		match msg {
			AllMessages::CandidateBacking(msg) => Ok(ToJob::CandidateBacking(msg)),
			_ => Err(()),
		}
167
	}
168
}
169

170
171
172
impl From<CandidateBackingMessage> for ToJob {
	fn from(msg: CandidateBackingMessage) -> Self {
		Self::CandidateBacking(msg)
173
174
175
	}
}

176
177
impl util::ToJobTrait for ToJob {
	const STOP: Self = ToJob::Stop;
178

179
180
181
182
183
184
	fn relay_parent(&self) -> Option<Hash> {
		match self {
			Self::CandidateBacking(cb) => cb.relay_parent(),
			Self::Stop => None,
		}
	}
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
}

/// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`.
enum FromJob {
	AvailabilityStore(AvailabilityStoreMessage),
	RuntimeApiMessage(RuntimeApiMessage),
	CandidateValidation(CandidateValidationMessage),
	CandidateSelection(CandidateSelectionMessage),
	Provisioner(ProvisionerMessage),
	PoVDistribution(PoVDistributionMessage),
	StatementDistribution(StatementDistributionMessage),
}

impl From<FromJob> for AllMessages {
	fn from(f: FromJob) -> Self {
		match f {
			FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg),
			FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg),
			FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg),
			FromJob::CandidateSelection(msg) => AllMessages::CandidateSelection(msg),
			FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg),
			FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg),
			FromJob::Provisioner(msg) => AllMessages::Provisioner(msg),
		}
	}
}

212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
impl TryFrom<AllMessages> for FromJob {
	type Error = &'static str;

	fn try_from(f: AllMessages) -> Result<Self, Self::Error> {
		match f {
			AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)),
			AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)),
			AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)),
			AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)),
			AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)),
			AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)),
			AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)),
			_ => Err("can't convert this AllMessages variant to FromJob"),
		}
	}
}

229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
	let statement = match s.payload() {
		Statement::Seconded(c) => TableStatement::Candidate(c.clone()),
		Statement::Valid(h) => TableStatement::Valid(h.clone()),
		Statement::Invalid(h) => TableStatement::Invalid(h.clone()),
	};

	TableSignedStatement {
		statement,
		signature: s.signature().clone(),
		sender: s.validator_index(),
	}
}

impl CandidateBackingJob {
	/// Run asynchronously.
247
	async fn run_loop(mut self) -> Result<(), Error> {
248
249
250
251
252
253
254
255
256
257
258
259
260
261
		while let Some(msg) = self.rx_to.next().await {
			match msg {
				ToJob::CandidateBacking(msg) => {
					self.process_msg(msg).await?;
				}
				_ => break,
			}
		}

		Ok(())
	}

	async fn issue_candidate_invalid_message(
		&mut self,
asynchronous rob's avatar
asynchronous rob committed
262
		candidate: CandidateReceipt,
263
264
265
266
267
268
269
270
271
	) -> Result<(), Error> {
		self.tx_from.send(FromJob::CandidateSelection(
			CandidateSelectionMessage::Invalid(self.parent, candidate)
		)).await?;

		Ok(())
	}

	/// Validate the candidate that is requested to be `Second`ed and distribute validation result.
asynchronous rob's avatar
asynchronous rob committed
272
273
	///
	/// Returns `Ok(true)` if we issued a `Seconded` statement about this candidate.
274
275
	async fn validate_and_second(
		&mut self,
asynchronous rob's avatar
asynchronous rob committed
276
277
278
		candidate: &CandidateReceipt,
		pov: PoV,
	) -> Result<bool, Error> {
279
280
281
282
283
284
285
286
		// Check that candidate is collated by the right collator.
		if self.required_collator.as_ref()
			.map_or(false, |c| c != &candidate.descriptor().collator)
		{
			self.issue_candidate_invalid_message(candidate.clone()).await?;
			return Ok(false);
		}

asynchronous rob's avatar
asynchronous rob committed
287
288
289
290
291
292
293
294
		let valid = self.request_candidate_validation(
			candidate.descriptor().clone(),
			Arc::new(pov.clone()),
		).await?;

		let candidate_hash = candidate.hash();

		let statement = match valid {
295
			ValidationResult::Valid(outputs, validation_data) => {
296
297
298
				// make PoV available for later distribution. Send data to the availability
				// store to keep. Sign and dispatch `valid` statement to network if we
				// have not seconded the given candidate.
asynchronous rob's avatar
asynchronous rob committed
299
300
301
302
303
				//
				// If the commitments hash produced by validation is not the same as given by
				// the collator, do not make available and report the collator.
				let commitments_check = self.make_pov_available(
					pov,
304
					validation_data,
asynchronous rob's avatar
asynchronous rob committed
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
					outputs,
					|commitments| if commitments.hash() == candidate.commitments_hash {
						Ok(CommittedCandidateReceipt {
							descriptor: candidate.descriptor().clone(),
							commitments,
						})
					} else {
						Err(())
					},
				).await?;

				match commitments_check {
					Ok(candidate) => {
						self.issued_statements.insert(candidate_hash);
						Some(Statement::Seconded(candidate))
					}
					Err(()) => {
						self.issue_candidate_invalid_message(candidate.clone()).await?;
						None
					}
				}
326
			}
327
			ValidationResult::Invalid(_reason) => {
asynchronous rob's avatar
asynchronous rob committed
328
329
330
331
332
333
				// no need to issue a statement about this if we aren't seconding it.
				//
				// there's an infinite amount of garbage out there. no need to acknowledge
				// all of it.
				self.issue_candidate_invalid_message(candidate.clone()).await?;
				None
334
335
336
			}
		};

asynchronous rob's avatar
asynchronous rob committed
337
		let issued_statement = statement.is_some();
338
339
340
341
342
343

		if let Some(statement) = statement {
			if let Some(signed_statement) = self.sign_statement(statement).await {
				self.import_statement(&signed_statement).await?;
				self.distribute_signed_statement(signed_statement).await?;
			}
344
345
		}

asynchronous rob's avatar
asynchronous rob committed
346
		Ok(issued_statement)
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
	}

	fn get_backed(&self) -> Vec<NewBackedCandidate> {
		let proposed = self.table.proposed_candidates(&self.table_context);
		let mut res = Vec::with_capacity(proposed.len());

		for p in proposed.into_iter() {
			let TableAttestedCandidate { candidate, validity_votes, .. } = p;

			let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes
						.into_iter()
						.map(|(id, vote)| (id, vote.into()))
						.unzip();

			let group = match self.table_context.groups.get(&self.assignment) {
				Some(group) => group,
				None => continue,
			};

366
			let mut validator_indices = BitVec::with_capacity(group.len());
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388

			validator_indices.resize(group.len(), false);

			for id in ids.iter() {
				if let Some(position) = group.iter().position(|x| x == id) {
					validator_indices.set(position, true);
				}
			}

			let backed = BackedCandidate {
				candidate,
				validity_votes,
				validator_indices,
			};

			res.push(NewBackedCandidate(backed.clone()));
		}

		res
	}

	/// Check if there have happened any new misbehaviors and issue necessary messages.
asynchronous rob's avatar
asynchronous rob committed
389
	///
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
	/// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387)
	async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
		let mut reports = Vec::new();

		for (k, v) in self.table.get_misbehavior().iter() {
			if !self.reported_misbehavior_for.contains(k) {
				self.reported_misbehavior_for.insert(*k);

				let f = FromTableMisbehavior {
					id: *k,
					report: v.clone(),
					signing_context: self.table_context.signing_context.clone(),
					key: self.table_context.validators[*k as usize].clone(),
				};

				if let Ok(report) = MisbehaviorReport::try_from(f) {
					let message = ProvisionerMessage::ProvisionableData(
407
						ProvisionableData::MisbehaviorReport(self.parent, report),
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
					);

					reports.push(message);
				}
			}
		}

		for report in reports.drain(..) {
			self.send_to_provisioner(report).await?
		}

		Ok(())
	}

	/// Import a statement into the statement table and return the summary of the import.
	async fn import_statement(
		&mut self,
		statement: &SignedFullStatement,
	) -> Result<Option<TableSummary>, Error> {
		let stmt = primitive_statement_to_table(statement);

		let summary = self.table.import_statement(&self.table_context, stmt);

		self.issue_new_misbehaviors().await?;

		return Ok(summary);
	}

	async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
		match msg {
			CandidateBackingMessage::Second(_, candidate, pov) => {
				// Sanity check that candidate is from our assignment.
asynchronous rob's avatar
asynchronous rob committed
440
				if candidate.descriptor().para_id != self.assignment {
441
442
443
444
445
446
447
448
449
450
451
452
					return Ok(());
				}

				// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
				// Seconded statement only if we have not seconded any other candidate and
				// have not signed a Valid statement for the requested candidate.
				match self.seconded {
					// This job has not seconded a candidate yet.
					None => {
						let candidate_hash = candidate.hash();

						if !self.issued_statements.contains(&candidate_hash) {
asynchronous rob's avatar
asynchronous rob committed
453
454
							if let Ok(true) = self.validate_and_second(
								&candidate,
455
456
								pov,
							).await {
457
								self.metrics.on_candidate_seconded();
458
459
460
461
462
463
464
465
466
467
								self.seconded = Some(candidate_hash);
							}
						}
					}
					// This job has already seconded a candidate.
					Some(_) => {}
				}
			}
			CandidateBackingMessage::Statement(_, statement) => {
				self.check_statement_signature(&statement)?;
468
469
470
471
472
				match self.maybe_validate_and_import(statement).await {
					Err(Error::ValidationFailed(_)) => return Ok(()),
					Err(e) => return Err(e),
					Ok(()) => (),
				}
473
474
475
476
			}
			CandidateBackingMessage::GetBackedCandidates(_, tx) => {
				let backed = self.get_backed();

477
				tx.send(backed).map_err(|data| Error::Send(data))?;
478
479
480
481
482
483
484
485
486
487
			}
		}

		Ok(())
	}

	/// Kick off validation work and distribute the result as a signed statement.
	async fn kick_off_validation_work(
		&mut self,
		summary: TableSummary,
asynchronous rob's avatar
asynchronous rob committed
488
489
490
491
492
493
494
495
496
497
498
499
500
501
	) -> Result<(), Error> {
		let candidate_hash = summary.candidate.clone();

		if self.issued_statements.contains(&candidate_hash) {
			return Ok(())
		}

		// We clone the commitments here because there are borrowck
		// errors relating to this being a struct and methods borrowing the entirety of self
		// and not just those things that the function uses.
		let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?;
		let expected_commitments = candidate.commitments.clone();

		let descriptor = candidate.descriptor().clone();
502
503
504
505
506
507
508
509
510
511
512
513
514

		// Check that candidate is collated by the right collator.
		if self.required_collator.as_ref()
			.map_or(false, |c| c != &descriptor.collator)
		{
			// If not, we've got the statement in the table but we will
			// not issue validation work for it.
			//
			// Act as though we've issued a statement.
			self.issued_statements.insert(candidate_hash);
			return Ok(());
		}

asynchronous rob's avatar
asynchronous rob committed
515
516
517
518
		let pov = self.request_pov_from_distribution(descriptor.clone()).await?;
		let v = self.request_candidate_validation(descriptor, pov.clone()).await?;

		let statement = match v {
519
			ValidationResult::Valid(outputs, validation_data) => {
asynchronous rob's avatar
asynchronous rob committed
520
521
522
				// If validation produces a new set of commitments, we vote the candidate as invalid.
				let commitments_check = self.make_pov_available(
					(&*pov).clone(),
523
					validation_data,
asynchronous rob's avatar
asynchronous rob committed
524
525
526
527
528
529
530
					outputs,
					|commitments| if commitments == expected_commitments {
						Ok(())
					} else {
						Err(())
					}
				).await?;
531

asynchronous rob's avatar
asynchronous rob committed
532
533
534
535
				match commitments_check {
					Ok(()) => Statement::Valid(candidate_hash),
					Err(()) => Statement::Invalid(candidate_hash),
				}
536
			}
537
			ValidationResult::Invalid(_reason) => {
538
539
540
541
542
543
				Statement::Invalid(candidate_hash)
			}
		};

		self.issued_statements.insert(candidate_hash);

544
		if let Some(signed_statement) = self.sign_statement(statement).await {
545
546
547
			self.distribute_signed_statement(signed_statement).await?;
		}

asynchronous rob's avatar
asynchronous rob committed
548
		Ok(())
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
	}

	/// Import the statement and kick off validation work if it is a part of our assignment.
	async fn maybe_validate_and_import(
		&mut self,
		statement: SignedFullStatement,
	) -> Result<(), Error> {
		if let Some(summary) = self.import_statement(&statement).await? {
			if let Statement::Seconded(_) = statement.payload() {
				if summary.group_id == self.assignment {
					self.kick_off_validation_work(summary).await?;
				}
			}
		}

		Ok(())
	}

567
568
569
570
571
572
573
	async fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
		let signed = self.table_context
			.validator
			.as_ref()?
			.sign(self.keystore.clone(), statement)
			.await
			.ok()?;
574
575
		self.metrics.on_statement_signed();
		Some(signed)
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
	}

	fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
		let idx = statement.validator_index() as usize;

		if self.table_context.validators.len() > idx {
			statement.check_signature(
				&self.table_context.signing_context,
				&self.table_context.validators[idx],
			).map_err(|_| Error::InvalidSignature)?;
		} else {
			return Err(Error::InvalidSignature);
		}

		Ok(())
	}

	async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
		self.tx_from.send(FromJob::Provisioner(msg)).await?;

		Ok(())
	}

	async fn request_pov_from_distribution(
		&mut self,
		descriptor: CandidateDescriptor,
asynchronous rob's avatar
asynchronous rob committed
602
	) -> Result<Arc<PoV>, Error> {
603
604
605
606
607
608
		let (tx, rx) = oneshot::channel();

		self.tx_from.send(FromJob::PoVDistribution(
			PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
		)).await?;

asynchronous rob's avatar
asynchronous rob committed
609
		Ok(rx.await?)
610
611
612
613
	}

	async fn request_candidate_validation(
		&mut self,
asynchronous rob's avatar
asynchronous rob committed
614
615
616
		candidate: CandidateDescriptor,
		pov: Arc<PoV>,
	) -> Result<ValidationResult, Error> {
617
618
619
		let (tx, rx) = oneshot::channel();

		self.tx_from.send(FromJob::CandidateValidation(
asynchronous rob's avatar
asynchronous rob committed
620
				CandidateValidationMessage::ValidateFromChainState(
621
622
623
624
625
626
627
628
629
630
					candidate,
					pov,
					tx,
				)
			)
		).await?;

		Ok(rx.await??)
	}

631
	async fn store_available_data(
632
		&mut self,
633
634
635
		id: Option<ValidatorIndex>,
		n_validators: u32,
		available_data: AvailableData,
636
	) -> Result<(), Error> {
637
		let (tx, rx) = oneshot::channel();
638
		self.tx_from.send(FromJob::AvailabilityStore(
639
640
641
642
643
644
645
				AvailabilityStoreMessage::StoreAvailableData(
					self.parent,
					id,
					n_validators,
					available_data,
					tx,
				)
646
647
648
			)
		).await?;

649
		let _ = rx.await?;
650

651
652
653
		Ok(())
	}

654
	// Make a `PoV` available.
asynchronous rob's avatar
asynchronous rob committed
655
656
657
658
659
	//
	// This calls an inspection function before making the PoV available for any last checks
	// that need to be done. If the inspection function returns an error, this function returns
	// early without making the PoV available.
	async fn make_pov_available<T, E>(
660
		&mut self,
asynchronous rob's avatar
asynchronous rob committed
661
		pov: PoV,
662
		validation_data: polkadot_primitives::v1::PersistedValidationData,
asynchronous rob's avatar
asynchronous rob committed
663
664
665
		outputs: ValidationOutputs,
		with_commitments: impl FnOnce(CandidateCommitments) -> Result<T, E>,
	) -> Result<Result<T, E>, Error> {
666
		let available_data = AvailableData {
asynchronous rob's avatar
asynchronous rob committed
667
			pov,
668
			validation_data,
669
670
		};

asynchronous rob's avatar
asynchronous rob committed
671
		let chunks = erasure_coding::obtain_chunks_v1(
672
673
674
675
676
			self.table_context.validators.len(),
			&available_data,
		)?;

		let branches = erasure_coding::branches(chunks.as_ref());
asynchronous rob's avatar
asynchronous rob committed
677
678
679
680
681
682
683
684
685
		let erasure_root = branches.root();

		let commitments = CandidateCommitments {
			fees: outputs.fees,
			upward_messages: outputs.upward_messages,
			erasure_root,
			new_validation_code: outputs.new_validation_code,
			head_data: outputs.head_data,
		};
686

asynchronous rob's avatar
asynchronous rob committed
687
688
689
690
691
		let res = match with_commitments(commitments) {
			Ok(x) => x,
			Err(e) => return Ok(Err(e)),
		};

692
693
694
695
696
		self.store_available_data(
			self.table_context.validator.as_ref().map(|v| v.index()),
			self.table_context.validators.len() as u32,
			available_data,
		).await?;
697

asynchronous rob's avatar
asynchronous rob committed
698
		Ok(Ok(res))
699
700
701
702
703
704
705
706
707
708
709
	}

	async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
		let smsg = StatementDistributionMessage::Share(self.parent, s);

		self.tx_from.send(FromJob::StatementDistribution(smsg)).await?;

		Ok(())
	}
}

710
711
712
713
impl util::JobTrait for CandidateBackingJob {
	type ToJob = ToJob;
	type FromJob = FromJob;
	type Error = Error;
714
	type RunArgs = SyncCryptoStorePtr;
715
	type Metrics = Metrics;
716

717
	const NAME: &'static str = "CandidateBackingJob";
718

719
720
	fn run(
		parent: Hash,
721
		keystore: SyncCryptoStorePtr,
722
		metrics: Metrics,
723
724
725
726
		rx_to: mpsc::Receiver<Self::ToJob>,
		mut tx_from: mpsc::Sender<Self::FromJob>,
	) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
		async move {
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
			macro_rules! try_runtime_api {
				($x: expr) => {
					match $x {
						Ok(x) => x,
						Err(e) => {
							log::warn!(
								target: "candidate_backing",
								"Failed to fetch runtime API data for job: {:?}",
								e,
							);

							// We can't do candidate validation work if we don't have the
							// requisite runtime API data. But these errors should not take
							// down the node.
							return Ok(());
						}
					}
				}
			}

			let (validators, groups, session_index, cores) = futures::try_join!(
748
749
				request_validators(parent, &mut tx_from).await?,
				request_validator_groups(parent, &mut tx_from).await?,
750
751
752
753
754
755
				request_session_index_for_child(parent, &mut tx_from).await?,
				request_from_runtime(
					parent,
					&mut tx_from,
					|tx| RuntimeApiRequest::AvailabilityCores(tx),
				).await?,
756
757
			)?;

758
759
760
761
762
763
			let validators = try_runtime_api!(validators);
			let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
			let session_index = try_runtime_api!(session_index);
			let cores = try_runtime_api!(cores);

			let signing_context = SigningContext { parent_hash: parent, session_index };
764
765
766
767
			let validator = match Validator::construct(
				&validators,
				signing_context,
				keystore.clone(),
768
			).await {
769
770
771
772
773
774
775
776
777
778
779
780
				Ok(v) => v,
				Err(util::Error::NotAValidator) => { return Ok(()) },
				Err(e) => {
					log::warn!(
						target: "candidate_backing",
						"Cannot participate in candidate backing: {:?}",
						e
					);

					return Ok(())
				}
			};
781
782
783

			let mut groups = HashMap::new();

784
			let n_cores = cores.len();
785
786

			let mut assignment = None;
787
788
789
790
791
792
			for (idx, core) in cores.into_iter().enumerate() {
				// Ignore prospective assignments on occupied cores for the time being.
				if let CoreState::Scheduled(scheduled) = core {
					let core_index = CoreIndex(idx as _);
					let group_index = group_rotation_info.group_for_core(core_index, n_cores);
					if let Some(g) = validator_groups.get(group_index.0 as usize) {
793
794
795
						if g.contains(&validator.index()) {
							assignment = Some((scheduled.para_id, scheduled.collator));
						}
796
797
						groups.insert(scheduled.para_id, g.clone());
					}
798
				}
799
800
			}

801
802
803
804
805
806
			let table_context = TableContext {
				groups,
				validators,
				signing_context: validator.signing_context().clone(),
				validator: Some(validator),
			};
807

808
809
810
811
812
			let (assignment, required_collator) = match assignment {
				None => return Ok(()), // no need to work.
				Some((a, r)) => (a, r),
			};

813
814
815
816
817
			let job = CandidateBackingJob {
				parent,
				rx_to,
				tx_from,
				assignment,
818
				required_collator,
819
820
821
				issued_statements: HashSet::new(),
				seconded: None,
				reported_misbehavior_for: HashSet::new(),
822
				keystore,
823
824
				table: Table::default(),
				table_context,
825
				metrics,
826
			};
827

828
			job.run_loop().await
829
		}
830
		.boxed()
831
832
833
	}
}

834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
#[derive(Clone)]
struct MetricsInner {
	signed_statements_total: prometheus::Counter<prometheus::U64>,
	candidates_seconded_total: prometheus::Counter<prometheus::U64>
}

/// Candidate backing metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_statement_signed(&self) {
		if let Some(metrics) = &self.0 {
			metrics.signed_statements_total.inc();
		}
	}

	fn on_candidate_seconded(&self) {
		if let Some(metrics) = &self.0 {
			metrics.candidates_seconded_total.inc();
		}
	}
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			signed_statements_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_signed_statements_total",
					"Number of statements signed.",
				)?,
				registry,
			)?,
			candidates_seconded_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_candidates_seconded_total",
					"Number of candidates seconded.",
				)?,
				registry,
			)?,
		};
		Ok(Metrics(Some(metrics)))
	}
}

880
delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- ToJob as CandidateBackingSubsystem);
881

882
883
884
#[cfg(test)]
mod tests {
	use super::*;
885
	use assert_matches::assert_matches;
886
	use futures::{future, Future};
asynchronous rob's avatar
asynchronous rob committed
887
	use polkadot_primitives::v1::{
888
		ScheduledCore, BlockData, CandidateCommitments,
889
		PersistedValidationData, ValidationData, TransientValidationData, HeadData,
890
		ValidityAttestation, GroupRotationInfo,
891
	};
892
	use polkadot_subsystem::{
893
		messages::RuntimeApiRequest,
894
		ActiveLeavesUpdate, FromOverseer, OverseerSignal,
895
	};
896
	use polkadot_node_primitives::InvalidCandidate;
897
	use sp_keyring::Sr25519Keyring;
898
899
	use sp_application_crypto::AppKey;
	use sp_keystore::{CryptoStore, SyncCryptoStore};
900
	use std::collections::HashMap;
901
902
903
904
905
906
907

	fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
		val_ids.iter().map(|v| v.public().into()).collect()
	}

	struct TestState {
		chain_ids: Vec<ParaId>,
908
		keystore: SyncCryptoStorePtr,
909
910
		validators: Vec<Sr25519Keyring>,
		validator_public: Vec<ValidatorId>,
911
		validation_data: ValidationData,
912
913
		validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
		availability_cores: Vec<CoreState>,
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
		head_data: HashMap<ParaId, HeadData>,
		signing_context: SigningContext,
		relay_parent: Hash,
	}

	impl Default for TestState {
		fn default() -> Self {
			let chain_a = ParaId::from(1);
			let chain_b = ParaId::from(2);
			let thread_a = ParaId::from(3);

			let chain_ids = vec![chain_a, chain_b, thread_a];

			let validators = vec![
				Sr25519Keyring::Alice,
				Sr25519Keyring::Bob,
				Sr25519Keyring::Charlie,
				Sr25519Keyring::Dave,
				Sr25519Keyring::Ferdie,
			];

935
			let keystore = Arc::new(sc_keystore::LocalKeystore::in_memory());
936
			// Make sure `Alice` key is in the keystore, so this mocked node will be a parachain validator.
937
			SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, Some(&validators[0].to_seed()))
938
939
940
941
				.expect("Insert key into keystore");

			let validator_public = validator_pubkeys(&validators);

942
943
944
945
946
			let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]];
			let group_rotation_info = GroupRotationInfo {
				session_start_block: 0,
				group_rotation_frequency: 100,
				now: 1,
947
948
949
			};

			let thread_collator: CollatorId = Sr25519Keyring::Two.public().into();
950
951
952
953
954
955
956
957
958
959
960
961
962
963
			let availability_cores = vec![
				CoreState::Scheduled(ScheduledCore {
					para_id: chain_a,
					collator: None,
				}),
				CoreState::Scheduled(ScheduledCore {
					para_id: chain_b,
					collator: None,
				}),
				CoreState::Scheduled(ScheduledCore {
					para_id: thread_a,
					collator: Some(thread_collator.clone()),
				}),
			];
964

965
966
			let mut head_data = HashMap::new();
			head_data.insert(chain_a, HeadData(vec![4, 5, 6]));
967

968
			let relay_parent = Hash::from([5; 32]);
969
970
971

			let signing_context = SigningContext {
				session_index: 1,
972
				parent_hash: relay_parent,
973
974
			};

975
976
977
978
979
980
981
982
983
984
985
986
			let validation_data = ValidationData {
				persisted: PersistedValidationData {
					parent_head: HeadData(vec![7, 8, 9]),
					block_number: Default::default(),
					hrmp_mqc_heads: Vec::new(),
				},
				transient: TransientValidationData {
					max_code_size: 1000,
					max_head_data_size: 1000,
					balance: Default::default(),
					code_upgrade_allowed: None,
				},
987
988
989
990
991
992
993
			};

			Self {
				chain_ids,
				keystore,
				validators,
				validator_public,
994
995
				validator_groups: (validator_groups, group_rotation_info),
				availability_cores,
996
				head_data,
997
				validation_data,
998
999
1000
				signing_context,
				relay_parent,
			}
For faster browsing, not all history is shown. View entire blame