lib.rs 41.8 KB
Newer Older
1
// Copyright 2020-2021 Parity Technologies (UK) Ltd.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements a `CandidateBackingSubsystem`.

19
20
#![deny(unused_crate_dependencies)]

Shawn Tabrizi's avatar
Shawn Tabrizi committed
21
22
23
24
25
use std::{
	collections::{HashMap, HashSet},
	pin::Pin,
	sync::Arc,
};
26
27

use bitvec::vec::BitVec;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
28
29
30
31
use futures::{
	channel::{mpsc, oneshot},
	Future, FutureExt, SinkExt, StreamExt,
};
32

Shawn Tabrizi's avatar
Shawn Tabrizi committed
33
34
use polkadot_node_primitives::{
	AvailableData, PoV, SignedDisputeStatement, SignedFullStatement, Statement, ValidationResult,
35
	BACKING_EXECUTION_TIMEOUT,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
36
37
38
39
40
41
42
};
use polkadot_node_subsystem_util::{
	self as util,
	metrics::{self, prometheus},
	request_from_runtime, request_session_index_for_child, request_validator_groups,
	request_validators, FromJobCommand, JobSender, Validator,
};
asynchronous rob's avatar
asynchronous rob committed
43
use polkadot_primitives::v1::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
44
45
	BackedCandidate, CandidateCommitments, CandidateDescriptor, CandidateHash, CandidateReceipt,
	CollatorId, CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId, SessionIndex,
46
	SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
47
48
};
use polkadot_subsystem::{
49
	jaeger,
50
	messages::{
51
		AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage,
52
		CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
53
54
55
56
		DisputeCoordinatorMessage, ImportStatementsResult, ProvisionableData, ProvisionerMessage,
		RuntimeApiRequest, StatementDistributionMessage, ValidationFailed,
	},
	overseer, PerLeafSpan, Stage, SubsystemSender,
57
};
Shawn Tabrizi's avatar
Shawn Tabrizi committed
58
use sp_keystore::SyncCryptoStorePtr;
59
60
use statement_table::{
	generic::AttestedCandidate as TableAttestedCandidate,
asynchronous rob's avatar
asynchronous rob committed
61
	v1::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
62
		SignedStatement as TableSignedStatement, Statement as TableStatement,
63
		Summary as TableSummary,
asynchronous rob's avatar
asynchronous rob committed
64
	},
Shawn Tabrizi's avatar
Shawn Tabrizi committed
65
	Context as TableContextTrait, Table,
66
};
67
use thiserror::Error;
68

69
70
71
#[cfg(test)]
mod tests;

72
const LOG_TARGET: &str = "parachain::candidate-backing";
73

74
/// Errors that can occur in candidate backing.
75
#[derive(Debug, Error)]
76
pub enum Error {
77
	#[error("Candidate is not found")]
78
	CandidateNotFound,
79
	#[error("Signature is invalid")]
80
	InvalidSignature,
81
	#[error("Failed to send candidates {0:?}")]
82
	Send(Vec<BackedCandidate>),
83
84
	#[error("FetchPoV failed")]
	FetchPoV,
85
86
87
88
89
90
	#[error("ValidateFromChainState channel closed before receipt")]
	ValidateFromChainState(#[source] oneshot::Canceled),
	#[error("StoreAvailableData channel closed before receipt")]
	StoreAvailableData(#[source] oneshot::Canceled),
	#[error("a channel was closed before receipt in try_join!")]
	JoinMultiple(#[source] oneshot::Canceled),
91
	#[error("Obtaining erasure chunks failed")]
92
	ObtainErasureChunks(#[from] erasure_coding::Error),
93
94
95
	#[error(transparent)]
	ValidationFailed(#[from] ValidationFailed),
	#[error(transparent)]
96
	BackgroundValidationMpsc(#[from] mpsc::SendError),
97
98
	#[error(transparent)]
	UtilError(#[from] util::Error),
99
100
}

101
102
/// PoV data to validate.
enum PoVData {
Denis_P's avatar
Denis_P committed
103
	/// Already available (from candidate selection).
104
105
106
107
108
109
110
111
112
	Ready(Arc<PoV>),
	/// Needs to be fetched from validator (we are checking a signed statement).
	FetchFromValidator {
		from_validator: ValidatorIndex,
		candidate_hash: CandidateHash,
		pov_hash: Hash,
	},
}

113
enum ValidatedCandidateCommand {
114
	// We were instructed to second the candidate that has been already validated.
115
116
117
	Second(BackgroundValidationResult),
	// We were instructed to validate the candidate.
	Attest(BackgroundValidationResult),
118
119
	// We were not able to `Attest` because backing validator did not send us the PoV.
	AttestNoPoV(CandidateHash),
120
121
122
123
124
125
}

impl std::fmt::Debug for ValidatedCandidateCommand {
	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
		let candidate_hash = self.candidate_hash();
		match *self {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
126
127
128
			ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
			ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
			ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
129
130
131
132
133
134
135
136
137
138
139
		}
	}
}

impl ValidatedCandidateCommand {
	fn candidate_hash(&self) -> CandidateHash {
		match *self {
			ValidatedCandidateCommand::Second(Ok((ref candidate, _, _))) => candidate.hash(),
			ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
			ValidatedCandidateCommand::Attest(Ok((ref candidate, _, _))) => candidate.hash(),
			ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
140
			ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
141
142
143
144
		}
	}
}

145
/// Holds all data needed for candidate backing job operation.
146
pub struct CandidateBackingJob {
147
148
	/// The hash of the relay parent on top of which this job is doing it's work.
	parent: Hash,
149
150
	/// The session index this corresponds to.
	session_index: SessionIndex,
151
	/// The `ParaId` assigned to this validator
152
	assignment: Option<ParaId>,
153
154
	/// The collator required to author the candidate, if any.
	required_collator: Option<CollatorId>,
155
	/// Spans for all candidates that are not yet backable.
156
	unbacked_candidates: HashMap<CandidateHash, jaeger::Span>,
157
	/// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates.
158
	issued_statements: HashSet<CandidateHash>,
159
160
	/// These candidates are undergoing validation in the background.
	awaiting_validation: HashSet<CandidateHash>,
161
162
	/// Data needed for retrying in case of `ValidatedCandidateCommand::AttestNoPoV`.
	fallbacks: HashMap<CandidateHash, (AttestingData, Option<jaeger::Span>)>,
163
	/// `Some(h)` if this job has already issued `Seconded` statement for some candidate with `h` hash.
164
	seconded: Option<CandidateHash>,
asynchronous rob's avatar
asynchronous rob committed
165
166
	/// The candidates that are includable, by hash. Each entry here indicates
	/// that we've sent the provisioner the backed candidate.
167
	backed: HashSet<CandidateHash>,
168
	keystore: SyncCryptoStorePtr,
169
170
	table: Table<TableContext>,
	table_context: TableContext,
171
172
	background_validation: mpsc::Receiver<ValidatedCandidateCommand>,
	background_validation_tx: mpsc::Sender<ValidatedCandidateCommand>,
173
	metrics: Metrics,
174
175
}

176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
/// In case a backing validator does not provide a PoV, we need to retry with other backing
/// validators.
///
/// This is the data needed to accomplish this. Basically all the data needed for spawning a
/// validation job and a list of backing validators, we can try.
#[derive(Clone)]
struct AttestingData {
	/// The candidate to attest.
	candidate: CandidateReceipt,
	/// Hash of the PoV we need to fetch.
	pov_hash: Hash,
	/// Validator we are currently trying to get the PoV from.
	from_validator: ValidatorIndex,
	/// Other backing validators we can try in case `from_validator` failed.
	backing: Vec<ValidatorIndex>,
}

193
194
195
196
197
198
const fn group_quorum(n_validators: usize) -> usize {
	(n_validators / 2) + 1
}

#[derive(Default)]
struct TableContext {
199
	validator: Option<Validator>,
200
201
202
203
204
	groups: HashMap<ParaId, Vec<ValidatorIndex>>,
	validators: Vec<ValidatorId>,
}

impl TableContextTrait for TableContext {
asynchronous rob's avatar
asynchronous rob committed
205
	type AuthorityId = ValidatorIndex;
206
	type Digest = CandidateHash;
asynchronous rob's avatar
asynchronous rob committed
207
208
209
210
	type GroupId = ParaId;
	type Signature = ValidatorSignature;
	type Candidate = CommittedCandidateReceipt;

211
	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
asynchronous rob's avatar
asynchronous rob committed
212
213
214
215
216
217
218
219
		candidate.hash()
	}

	fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
		candidate.descriptor().para_id
	}

	fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
220
221
222
		self.groups
			.get(group)
			.map_or(false, |g| g.iter().position(|a| a == authority).is_some())
223
224
225
	}

	fn requisite_votes(&self, group: &ParaId) -> usize {
226
		self.groups.get(group).map_or(usize::MAX, |g| group_quorum(g.len()))
227
228
229
	}
}

230
231
struct InvalidErasureRoot;

232
233
234
235
// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
	let statement = match s.payload() {
236
		Statement::Seconded(c) => TableStatement::Seconded(c.clone()),
237
238
239
240
241
242
243
244
245
246
		Statement::Valid(h) => TableStatement::Valid(h.clone()),
	};

	TableSignedStatement {
		statement,
		signature: s.signature().clone(),
		sender: s.validator_index(),
	}
}

asynchronous rob's avatar
asynchronous rob committed
247
248
249
250
251
252
253
254
255
256
257
fn table_attested_to_backed(
	attested: TableAttestedCandidate<
		ParaId,
		CommittedCandidateReceipt,
		ValidatorIndex,
		ValidatorSignature,
	>,
	table_context: &TableContext,
) -> Option<BackedCandidate> {
	let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested;

Shawn Tabrizi's avatar
Shawn Tabrizi committed
258
259
	let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
		validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
asynchronous rob's avatar
asynchronous rob committed
260
261
262
263
264
265
266

	let group = table_context.groups.get(&para_id)?;

	let mut validator_indices = BitVec::with_capacity(group.len());

	validator_indices.resize(group.len(), false);

267
268
269
270
271
	// The order of the validity votes in the backed candidate must match
	// the order of bits set in the bitfield, which is not necessarily
	// the order of the `validity_votes` we got from the table.
	let mut vote_positions = Vec::with_capacity(validity_votes.len());
	for (orig_idx, id) in ids.iter().enumerate() {
asynchronous rob's avatar
asynchronous rob committed
272
273
		if let Some(position) = group.iter().position(|x| x == id) {
			validator_indices.set(position, true);
274
275
276
277
278
279
280
			vote_positions.push((orig_idx, position));
		} else {
			tracing::warn!(
				target: LOG_TARGET,
				"Logic error: Validity vote from table does not correspond to group",
			);

Shawn Tabrizi's avatar
Shawn Tabrizi committed
281
			return None
asynchronous rob's avatar
asynchronous rob committed
282
283
		}
	}
284
	vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
asynchronous rob's avatar
asynchronous rob committed
285
286
287

	Some(BackedCandidate {
		candidate,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
288
289
		validity_votes: vote_positions
			.into_iter()
290
291
			.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
			.collect(),
asynchronous rob's avatar
asynchronous rob committed
292
293
294
295
		validator_indices,
	})
}

296
async fn store_available_data(
297
	sender: &mut JobSender<impl SubsystemSender>,
298
299
300
301
302
	n_validators: u32,
	candidate_hash: CandidateHash,
	available_data: AvailableData,
) -> Result<(), Error> {
	let (tx, rx) = oneshot::channel();
Shawn Tabrizi's avatar
Shawn Tabrizi committed
303
	sender
304
		.send_message(AvailabilityStoreMessage::StoreAvailableData {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
305
306
307
308
			candidate_hash,
			n_validators,
			available_data,
			tx,
309
		})
Shawn Tabrizi's avatar
Shawn Tabrizi committed
310
		.await;
311

312
	let _ = rx.await.map_err(Error::StoreAvailableData)?;
313
314
315
316
317
318
319
320
321

	Ok(())
}

// Make a `PoV` available.
//
// This will compute the erasure root internally and compare it to the expected erasure root.
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
async fn make_pov_available(
322
	sender: &mut JobSender<impl SubsystemSender>,
323
324
325
326
327
	n_validators: usize,
	pov: Arc<PoV>,
	candidate_hash: CandidateHash,
	validation_data: polkadot_primitives::v1::PersistedValidationData,
	expected_erasure_root: Hash,
328
	span: Option<&jaeger::Span>,
329
) -> Result<Result<(), InvalidErasureRoot>, Error> {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
330
	let available_data = AvailableData { pov, validation_data };
331

332
	{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
333
		let _span = span.as_ref().map(|s| s.child("erasure-coding").with_candidate(candidate_hash));
334

Shawn Tabrizi's avatar
Shawn Tabrizi committed
335
		let chunks = erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
336

337
338
		let branches = erasure_coding::branches(chunks.as_ref());
		let erasure_root = branches.root();
339

340
		if erasure_root != expected_erasure_root {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
341
			return Ok(Err(InvalidErasureRoot))
342
		}
343
344
	}

345
	{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
346
		let _span = span.as_ref().map(|s| s.child("store-data").with_candidate(candidate_hash));
347

348
		store_available_data(sender, n_validators as u32, candidate_hash, available_data).await?;
349
	}
350
351
352
353

	Ok(Ok(()))
}

354
async fn request_pov(
355
	sender: &mut JobSender<impl SubsystemSender>,
356
357
358
359
	relay_parent: Hash,
	from_validator: ValidatorIndex,
	candidate_hash: CandidateHash,
	pov_hash: Hash,
360
) -> Result<Arc<PoV>, Error> {
361
	let (tx, rx) = oneshot::channel();
Shawn Tabrizi's avatar
Shawn Tabrizi committed
362
363
364
365
366
367
368
369
370
	sender
		.send_message(AvailabilityDistributionMessage::FetchPoV {
			relay_parent,
			from_validator,
			candidate_hash,
			pov_hash,
			tx,
		})
		.await;
371

372
373
	let pov = rx.await.map_err(|_| Error::FetchPoV)?;
	Ok(Arc::new(pov))
374
375
376
}

async fn request_candidate_validation(
377
	sender: &mut JobSender<impl SubsystemSender>,
378
379
380
381
382
	candidate: CandidateDescriptor,
	pov: Arc<PoV>,
) -> Result<ValidationResult, Error> {
	let (tx, rx) = oneshot::channel();

Shawn Tabrizi's avatar
Shawn Tabrizi committed
383
	sender
384
385
386
387
388
389
		.send_message(CandidateValidationMessage::ValidateFromChainState(
			candidate,
			pov,
			BACKING_EXECUTION_TIMEOUT,
			tx,
		))
Shawn Tabrizi's avatar
Shawn Tabrizi committed
390
		.await;
391

392
393
394
395
396
	match rx.await {
		Ok(Ok(validation_result)) => Ok(validation_result),
		Ok(Err(err)) => Err(Error::ValidationFailed(err)),
		Err(err) => Err(Error::ValidateFromChainState(err)),
	}
397
398
}

Shawn Tabrizi's avatar
Shawn Tabrizi committed
399
400
type BackgroundValidationResult =
	Result<(CandidateReceipt, CandidateCommitments, Arc<PoV>), CandidateReceipt>;
401

402
struct BackgroundValidationParams<S: overseer::SubsystemSender<AllMessages>, F> {
403
	sender: JobSender<S>,
404
405
406
	tx_command: mpsc::Sender<ValidatedCandidateCommand>,
	candidate: CandidateReceipt,
	relay_parent: Hash,
407
	pov: PoVData,
408
	n_validators: usize,
409
	span: Option<jaeger::Span>,
410
411
412
413
	make_command: F,
}

async fn validate_and_make_available(
414
415
416
	params: BackgroundValidationParams<
		impl SubsystemSender,
		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
417
	>,
418
419
) -> Result<(), Error> {
	let BackgroundValidationParams {
420
		mut sender,
421
422
423
424
425
		mut tx_command,
		candidate,
		relay_parent,
		pov,
		n_validators,
426
		span,
427
428
429
430
		make_command,
	} = params;

	let pov = match pov {
431
		PoVData::Ready(pov) => pov,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
432
		PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } => {
433
			let _span = span.as_ref().map(|s| s.child("request-pov"));
Shawn Tabrizi's avatar
Shawn Tabrizi committed
434
435
436
			match request_pov(&mut sender, relay_parent, from_validator, candidate_hash, pov_hash)
				.await
			{
437
				Err(Error::FetchPoV) => {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
438
439
440
					tx_command
						.send(ValidatedCandidateCommand::AttestNoPoV(candidate.hash()))
						.await
441
						.map_err(Error::BackgroundValidationMpsc)?;
442
					return Ok(())
Shawn Tabrizi's avatar
Shawn Tabrizi committed
443
				},
444
445
446
				Err(err) => return Err(err),
				Ok(pov) => pov,
			}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
447
		},
448
449
	};

450
	let v = {
451
		let _span = span.as_ref().map(|s| {
452
			s.child("request-validation")
453
454
				.with_pov(&pov)
				.with_para_id(candidate.descriptor().para_id)
455
		});
456
		request_candidate_validation(&mut sender, candidate.descriptor.clone(), pov.clone()).await?
457
	};
458
459
460
461
462

	let expected_commitments_hash = candidate.commitments_hash;

	let res = match v {
		ValidationResult::Valid(commitments, validation_data) => {
463
464
465
466
467
468
			tracing::debug!(
				target: LOG_TARGET,
				candidate_hash = ?candidate.hash(),
				"Validation successful",
			);

469
470
			// If validation produces a new set of commitments, we vote the candidate as invalid.
			if commitments.hash() != expected_commitments_hash {
471
				tracing::debug!(
472
					target: LOG_TARGET,
473
					candidate_hash = ?candidate.hash(),
474
475
476
					actual_commitments = ?commitments,
					"Commitments obtained with validation don't match the announced by the candidate receipt",
				);
477
478
479
				Err(candidate)
			} else {
				let erasure_valid = make_pov_available(
480
					&mut sender,
481
482
483
484
485
					n_validators,
					pov.clone(),
					candidate.hash(),
					validation_data,
					candidate.descriptor.erasure_root,
486
					span.as_ref(),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
487
488
				)
				.await?;
489
490
491

				match erasure_valid {
					Ok(()) => Ok((candidate, commitments, pov.clone())),
492
					Err(InvalidErasureRoot) => {
493
						tracing::debug!(
494
							target: LOG_TARGET,
495
							candidate_hash = ?candidate.hash(),
496
497
498
499
500
							actual_commitments = ?commitments,
							"Erasure root doesn't match the announced by the candidate receipt",
						);
						Err(candidate)
					},
501
502
				}
			}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
503
		},
504
		ValidationResult::Invalid(reason) => {
505
			tracing::debug!(
506
				target: LOG_TARGET,
507
				candidate_hash = ?candidate.hash(),
508
509
510
				reason = ?reason,
				"Validation yielded an invalid candidate",
			);
511
			Err(candidate)
Shawn Tabrizi's avatar
Shawn Tabrizi committed
512
		},
513
514
	};

515
	tx_command.send(make_command(res)).await.map_err(Into::into)
516
517
}

518
519
struct ValidatorIndexOutOfBounds;

520
521
impl CandidateBackingJob {
	/// Run asynchronously.
522
523
	async fn run_loop(
		mut self,
524
		mut sender: JobSender<impl SubsystemSender>,
525
		mut rx_to: mpsc::Receiver<CandidateBackingMessage>,
526
		span: PerLeafSpan,
527
	) -> Result<(), Error> {
528
		loop {
529
530
			futures::select! {
				validated_command = self.background_validation.next() => {
531
					let _span = span.child("process-validation-result");
532
					if let Some(c) = validated_command {
533
						self.handle_validated_candidate_command(&span, &mut sender, c).await?;
534
535
536
537
					} else {
						panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed");
					}
				}
538
				to_job = rx_to.next() => match to_job {
539
540
					None => break,
					Some(msg) => {
541
542
543
						// we intentionally want spans created in `process_msg` to descend from the
						// `span ` which is longer-lived than this ephemeral timing span.
						let _timing_span = span.child("process-message");
544
						self.process_msg(&span, &mut sender, msg).await?;
545
546
					}
				}
547
548
549
550
551
552
			}
		}

		Ok(())
	}

553
554
	async fn handle_validated_candidate_command(
		&mut self,
555
		root_span: &jaeger::Span,
556
		sender: &mut JobSender<impl SubsystemSender>,
557
558
559
560
561
562
563
564
		command: ValidatedCandidateCommand,
	) -> Result<(), Error> {
		let candidate_hash = command.candidate_hash();
		self.awaiting_validation.remove(&candidate_hash);

		match command {
			ValidatedCandidateCommand::Second(res) => {
				match res {
565
					Ok((candidate, commitments, _)) => {
566
						// sanity check.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
567
568
569
						if self.seconded.is_none() &&
							!self.issued_statements.contains(&candidate_hash)
						{
570
571
572
573
574
575
576
577
							self.seconded = Some(candidate_hash);
							self.issued_statements.insert(candidate_hash);
							self.metrics.on_candidate_seconded();

							let statement = Statement::Seconded(CommittedCandidateReceipt {
								descriptor: candidate.descriptor.clone(),
								commitments,
							});
Shawn Tabrizi's avatar
Shawn Tabrizi committed
578
579
580
581
582
583
584
585
586
587
							if let Some(stmt) = self
								.sign_import_and_distribute_statement(sender, statement, root_span)
								.await?
							{
								sender
									.send_message(CollatorProtocolMessage::Seconded(
										self.parent,
										stmt,
									))
									.await;
588
							}
589
						}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
590
					},
591
					Err(candidate) => {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
592
593
594
595
						sender
							.send_message(CollatorProtocolMessage::Invalid(self.parent, candidate))
							.await;
					},
596
				}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
597
			},
598
			ValidatedCandidateCommand::Attest(res) => {
599
600
				// We are done - avoid new validation spawns:
				self.fallbacks.remove(&candidate_hash);
601
602
				// sanity check.
				if !self.issued_statements.contains(&candidate_hash) {
603
604
					if res.is_ok() {
						let statement = Statement::Valid(candidate_hash);
Shawn Tabrizi's avatar
Shawn Tabrizi committed
605
606
						self.sign_import_and_distribute_statement(sender, statement, &root_span)
							.await?;
607
					}
608
609
					self.issued_statements.insert(candidate_hash);
				}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
610
			},
611
612
613
614
615
616
617
			ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
				if let Some((attesting, span)) = self.fallbacks.get_mut(&candidate_hash) {
					if let Some(index) = attesting.backing.pop() {
						attesting.from_validator = index;
						// Ok, another try:
						let c_span = span.as_ref().map(|s| s.child("try"));
						let attesting = attesting.clone();
618
						self.kick_off_validation_work(sender, attesting, c_span).await?
619
620
621
622
623
624
625
626
					}
				} else {
					tracing::warn!(
						target: LOG_TARGET,
						"AttestNoPoV was triggered without fallback being available."
					);
					debug_assert!(false);
				}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
627
			},
628
629
630
631
632
633
634
		}

		Ok(())
	}

	async fn background_validate_and_make_available(
		&mut self,
635
		sender: &mut JobSender<impl SubsystemSender>,
636
		params: BackgroundValidationParams<
637
			impl SubsystemSender,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
638
			impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
639
640
641
642
643
644
645
		>,
	) -> Result<(), Error> {
		let candidate_hash = params.candidate.hash();
		if self.awaiting_validation.insert(candidate_hash) {
			// spawn background task.
			let bg = async move {
				if let Err(e) = validate_and_make_available(params).await {
646
647
648
649
650
651
652
653
654
655
656
657
658
					if let Error::BackgroundValidationMpsc(error) = e {
						tracing::debug!(
							target: LOG_TARGET,
							?error,
							"Mpsc background validation mpsc died during validation- leaf no longer active?"
						);
					} else {
						tracing::error!(
							target: LOG_TARGET,
							"Failed to validate and make available: {:?}",
							e
						);
					}
659
660
				}
			};
Shawn Tabrizi's avatar
Shawn Tabrizi committed
661
			sender
662
				.send_command(FromJobCommand::Spawn("backing-validation", bg.boxed()))
Shawn Tabrizi's avatar
Shawn Tabrizi committed
663
				.await?;
664
665
666
667
668
669
		}

		Ok(())
	}

	/// Kick off background validation with intent to second.
670
671
	async fn validate_and_second(
		&mut self,
672
		parent_span: &jaeger::Span,
673
		root_span: &jaeger::Span,
674
		sender: &mut JobSender<impl SubsystemSender>,
asynchronous rob's avatar
asynchronous rob committed
675
		candidate: &CandidateReceipt,
676
		pov: Arc<PoV>,
677
	) -> Result<(), Error> {
678
		// Check that candidate is collated by the right collator.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
679
680
681
		if self
			.required_collator
			.as_ref()
682
683
			.map_or(false, |c| c != &candidate.descriptor().collator)
		{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
684
685
686
687
			sender
				.send_message(CollatorProtocolMessage::Invalid(self.parent, candidate.clone()))
				.await;
			return Ok(())
688
689
		}

690
		let candidate_hash = candidate.hash();
691
692
693
694
695
696
		let mut span = self.get_unbacked_validation_child(
			root_span,
			candidate_hash,
			candidate.descriptor().para_id,
		);

697
		span.as_mut().map(|span| span.add_follows_from(parent_span));
698

699
700
701
702
703
704
705
		tracing::debug!(
			target: LOG_TARGET,
			candidate_hash = ?candidate_hash,
			candidate_receipt = ?candidate,
			"Validate and second candidate",
		);

706
707
708
709
710
711
712
713
714
715
716
717
		let bg_sender = sender.clone();
		self.background_validate_and_make_available(
			sender,
			BackgroundValidationParams {
				sender: bg_sender,
				tx_command: self.background_validation_tx.clone(),
				candidate: candidate.clone(),
				relay_parent: self.parent,
				pov: PoVData::Ready(pov),
				n_validators: self.table_context.validators.len(),
				span,
				make_command: ValidatedCandidateCommand::Second,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
718
719
720
			},
		)
		.await?;
721

722
		Ok(())
723
724
	}

725
726
	async fn sign_import_and_distribute_statement(
		&mut self,
727
		sender: &mut JobSender<impl SubsystemSender>,
728
		statement: Statement,
729
		root_span: &jaeger::Span,
730
	) -> Result<Option<SignedFullStatement>, Error> {
asynchronous rob's avatar
asynchronous rob committed
731
		if let Some(signed_statement) = self.sign_statement(statement).await {
732
			self.import_statement(sender, &signed_statement, root_span).await?;
733
			let smsg = StatementDistributionMessage::Share(self.parent, signed_statement.clone());
734
			sender.send_unbounded_message(smsg);
735

736
737
738
			Ok(Some(signed_statement))
		} else {
			Ok(None)
asynchronous rob's avatar
asynchronous rob committed
739
740
741
		}
	}

742
	/// Check if there have happened any new misbehaviors and issue necessary messages.
743
	async fn issue_new_misbehaviors(&mut self, sender: &mut JobSender<impl SubsystemSender>) {
744
745
746
		// collect the misbehaviors to avoid double mutable self borrow issues
		let misbehaviors: Vec<_> = self.table.drain_misbehaviors().collect();
		for (validator_id, report) in misbehaviors {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
747
748
			sender
				.send_message(ProvisionerMessage::ProvisionableData(
749
					self.parent,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
750
751
752
					ProvisionableData::MisbehaviorReport(self.parent, validator_id, report),
				))
				.await;
753
754
755
756
757
758
		}
	}

	/// Import a statement into the statement table and return the summary of the import.
	async fn import_statement(
		&mut self,
759
		sender: &mut JobSender<impl SubsystemSender>,
760
		statement: &SignedFullStatement,
761
		root_span: &jaeger::Span,
762
	) -> Result<Option<TableSummary>, Error> {
763
764
765
		tracing::debug!(
			target: LOG_TARGET,
			statement = ?statement.payload().to_compact(),
766
			validator_index = statement.validator_index().0,
767
768
769
			"Importing statement",
		);

770
		let candidate_hash = statement.payload().candidate_hash();
771
		let import_statement_span = {
772
			// create a span only for candidates we're already aware of.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
773
774
775
776
777
			self.get_unbacked_statement_child(
				root_span,
				candidate_hash,
				statement.validator_index(),
			)
778
779
		};

Shawn Tabrizi's avatar
Shawn Tabrizi committed
780
781
782
783
		if let Err(ValidatorIndexOutOfBounds) = self
			.dispatch_new_statement_to_dispute_coordinator(sender, candidate_hash, &statement)
			.await
		{
784
785
786
787
788
789
790
791
			tracing::warn!(
				target: LOG_TARGET,
				session_index = ?self.session_index,
				relay_parent = ?self.parent,
				validator_index = statement.validator_index().0,
				"Supposedly 'Signed' statement has validator index out of bounds."
			);

Shawn Tabrizi's avatar
Shawn Tabrizi committed
792
			return Ok(None)
793
794
		}

795
796
797
798
		let stmt = primitive_statement_to_table(statement);

		let summary = self.table.import_statement(&self.table_context, stmt);

Shawn Tabrizi's avatar
Shawn Tabrizi committed
799
800
		let unbacked_span = if let Some(attested) = summary
			.as_ref()
801
802
803
804
805
806
807
			.and_then(|s| self.table.attested_candidate(&s.candidate, &self.table_context))
		{
			let candidate_hash = attested.candidate.hash();
			// `HashSet::insert` returns true if the thing wasn't in there already.
			if self.backed.insert(candidate_hash) {
				let span = self.remove_unbacked_span(&candidate_hash);

Shawn Tabrizi's avatar
Shawn Tabrizi committed
808
				if let Some(backed) = table_attested_to_backed(attested, &self.table_context) {
809
810
811
					tracing::debug!(
						target: LOG_TARGET,
						candidate_hash = ?candidate_hash,
Bastian Köcher's avatar
Bastian Köcher committed
812
813
						relay_parent = ?self.parent,
						para_id = %backed.candidate.descriptor.para_id,
814
815
816
						"Candidate backed",
					);

817
818
819
820
					let message = ProvisionerMessage::ProvisionableData(
						self.parent,
						ProvisionableData::BackedCandidate(backed.receipt()),
					);
821
					sender.send_message(message).await;
822
823
824
825
826

					span.as_ref().map(|s| s.child("backed"));
					span
				} else {
					None
asynchronous rob's avatar
asynchronous rob committed
827
				}
828
829
			} else {
				None
asynchronous rob's avatar
asynchronous rob committed
830
			}
831
832
833
		} else {
			None
		};
asynchronous rob's avatar
asynchronous rob committed
834

835
		self.issue_new_misbehaviors(sender).await;
836

837
838
839
840
		// It is important that the child span is dropped before its parent span (`unbacked_span`)
		drop(import_statement_span);
		drop(unbacked_span);

asynchronous rob's avatar
asynchronous rob committed
841
		Ok(summary)
842
843
	}

844
845
846
847
848
849
850
851
852
853
	/// The dispute coordinator keeps track of all statements by validators about every recent
	/// candidate.
	///
	/// When importing a statement, this should be called access the candidate receipt either
	/// from the statement itself or from the underlying statement table in order to craft
	/// and dispatch the notification to the dispute coordinator.
	///
	/// This also does bounds-checking on the validator index and will return an error if the
	/// validator index is out of bounds for the current validator set. It's expected that
	/// this should never happen due to the interface of the candidate backing subsystem -
Denis_P's avatar
Denis_P committed
854
	/// the networking component responsible for feeding statements to the backing subsystem
855
856
857
858
859
860
861
862
863
	/// is meant to check the signature and provenance of all statements before submission.
	async fn dispatch_new_statement_to_dispute_coordinator(
		&self,
		sender: &mut JobSender<impl SubsystemSender>,
		candidate_hash: CandidateHash,
		statement: &SignedFullStatement,
	) -> Result<(), ValidatorIndexOutOfBounds> {
		// Dispatch the statement to the dispute coordinator.
		let validator_index = statement.validator_index();
Shawn Tabrizi's avatar
Shawn Tabrizi committed
864
865
		let signing_context =
			SigningContext { parent_hash: self.parent, session_index: self.session_index };
866

Shawn Tabrizi's avatar
Shawn Tabrizi committed
867
868
		let validator_public = match self.table_context.validators.get(validator_index.0 as usize) {
			None => return Err(ValidatorIndexOutOfBounds),
869
870
871
872
873
874
875
876
877
			Some(v) => v,
		};

		let maybe_candidate_receipt = match statement.payload() {
			Statement::Seconded(receipt) => Some(receipt.to_plain()),
			Statement::Valid(candidate_hash) => {
				// Valid statements are only supposed to be imported
				// once we've seen at least one `Seconded` statement.
				self.table.get_candidate(&candidate_hash).map(|c| c.to_plain())
Shawn Tabrizi's avatar
Shawn Tabrizi committed
878
			},
879
880
881
882
883
884
		};

		let maybe_signed_dispute_statement = SignedDisputeStatement::from_backing_statement(
			statement.as_unchecked(),
			signing_context,
			validator_public.clone(),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
885
886
		)
		.ok();
887

Shawn Tabrizi's avatar
Shawn Tabrizi committed
888
889
		if let (Some(candidate_receipt), Some(dispute_statement)) =
			(maybe_candidate_receipt, maybe_signed_dispute_statement)
890
891
		{
			let (pending_confirmation, confirmation_rx) = oneshot::channel();
Shawn Tabrizi's avatar
Shawn Tabrizi committed
892
893
			sender
				.send_message(DisputeCoordinatorMessage::ImportStatements {
894
895
896
897
898
					candidate_hash,
					candidate_receipt,
					session: self.session_index,
					statements: vec![(dispute_statement, validator_index)],
					pending_confirmation,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
899
900
				})
				.await;
901
902

			match confirmation_rx.await {
903
904
905
				Err(oneshot::Canceled) => {
					tracing::debug!(target: LOG_TARGET, "Dispute coordinator confirmation lost",)
				},
Shawn Tabrizi's avatar
Shawn Tabrizi committed
906
				Ok(ImportStatementsResult::ValidImport) => {},
907
908
909
				Ok(ImportStatementsResult::InvalidImport) => {
					tracing::warn!(target: LOG_TARGET, "Failed to import statements of validity",)
				},
910
911
912
913
914
915
			}
		}

		Ok(())
	}

916
917
918
919
920
921
	async fn process_msg(
		&mut self,
		root_span: &jaeger::Span,
		sender: &mut JobSender<impl SubsystemSender>,
		msg: CandidateBackingMessage,
	) -> Result<(), Error> {
922
		match msg {
923
			CandidateBackingMessage::Second(relay_parent, candidate, pov) => {
924
925
				let _timer = self.metrics.time_process_second();

Shawn Tabrizi's avatar
Shawn Tabrizi committed
926
927
				let span = root_span
					.child("second")
928
929
					.with_stage(jaeger::Stage::CandidateBacking)
					.with_pov(&pov)
930
931
					.with_candidate(candidate.hash())
					.with_relay_parent(relay_parent);
932

933
				// Sanity check that candidate is from our assignment.
934
				if Some(candidate.descriptor().para_id) != self.assignment {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
935
					return Ok(())
936
937
938
939
940
				}

				// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
				// Seconded statement only if we have not seconded any other candidate and
				// have not signed a Valid statement for the requested candidate.
asynchronous rob's avatar
asynchronous rob committed
941
				if self.seconded.is_none() {
942
					// This job has not seconded a candidate yet.
asynchronous rob's avatar
asynchronous rob committed
943
944
945
					let candidate_hash = candidate.hash();

					if !self.issued_statements.contains(&candidate_hash) {
946
						let pov = Arc::new(pov);
Shawn Tabrizi's avatar
Shawn Tabrizi committed
947
948
						self.validate_and_second(&span, &root_span, sender, &candidate, pov)
							.await?;
949
950
					}
				}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
951
			},
952
			CandidateBackingMessage::Statement(_relay_parent, statement) => {
953
				let _timer = self.metrics.time_process_statement();
Shawn Tabrizi's avatar
Shawn Tabrizi committed
954
955
				let _span = root_span
					.child("statement")
956
					.with_stage(jaeger::Stage::CandidateBacking)
957
958
					.with_candidate(statement.payload().candidate_hash())
					.with_relay_parent(_relay_parent);
959

960
				match self.maybe_validate_and_import(&root_span, sender, statement).await {
961
962
963
964
					Err(Error::ValidationFailed(_)) => return Ok(()),
					Err(e) => return Err(e),
					Ok(()) => (),
				}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
965
			},
966
			CandidateBackingMessage::GetBackedCandidates(_, requested_candidates, tx) => {
967
968
				let _timer = self.metrics.time_get_backed_candidates();

969
970
971
				let backed = requested_candidates
					.into_iter()
					.filter_map(|hash| {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
972
973
974
						self.table.attested_candidate(&hash, &self.table_context).and_then(
							|attested| table_attested_to_backed(attested, &self.table_context),
						)
975
976
					})
					.collect();
977

978
				tx.send(backed).map_err(|data| Error::Send(data))?;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
979
			},
980
981
982
983
984
985
986
987
		}

		Ok(())
	}

	/// Kick off validation work and distribute the result as a signed statement.
	async fn kick_off_validation_work(
		&mut self,
988
		sender: &mut JobSender<impl SubsystemSender>,
989
		attesting: AttestingData,
990
		span: Option<jaeger::Span>,
asynchronous rob's avatar
asynchronous rob committed
991
	) -> Result<(), Error> {
992
		let candidate_hash = attesting.candid