lib.rs 76.1 KB
Newer Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements a `CandidateBackingSubsystem`.

#![deny(unused_crate_dependencies)]

use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;

use bitvec::vec::BitVec;
use futures::{channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_primitives::v1::{
	AvailableData, BackedCandidate, CandidateCommitments, CandidateDescriptor, CandidateHash,
	CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId,
	PoV, SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
};
use polkadot_node_primitives::{
	Statement, SignedFullStatement, ValidationResult,
};
use polkadot_subsystem::{
	PerLeafSpan, Stage,
	jaeger,
	messages::{
		AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
		CandidateValidationMessage, PoVDistributionMessage, ProvisionableData,
		ProvisionerMessage, StatementDistributionMessage, ValidationFailed, RuntimeApiRequest,
};
use polkadot_node_subsystem_util::{
	self as util,
	request_session_index_for_child,
	request_validator_groups,
	request_validators,
	request_from_runtime,
	Validator,
	delegated_subsystem,
	FromJobCommand,
	metrics::{self, prometheus},
};
use statement_table::{
	generic::AttestedCandidate as TableAttestedCandidate,
	Context as TableContextTrait,
	Table,
	v1::{
		Statement as TableStatement,
use thiserror::Error;
const LOG_TARGET: &str = "parachain::candidate-backing";
#[derive(Debug, Error)]
enum Error {
	#[error("Candidate is not found")]
	CandidateNotFound,
	#[error("Signature is invalid")]
	InvalidSignature,
	#[error("Failed to send candidates {0:?}")]
	Send(Vec<BackedCandidate>),
	#[error("FetchPoV channel closed before receipt")]
	FetchPoV(#[source] oneshot::Canceled),
	#[error("ValidateFromChainState channel closed before receipt")]
	ValidateFromChainState(#[source] oneshot::Canceled),
	#[error("StoreAvailableData channel closed before receipt")]
	StoreAvailableData(#[source] oneshot::Canceled),
	#[error("a channel was closed before receipt in try_join!")]
	JoinMultiple(#[source] oneshot::Canceled),
	#[error("Obtaining erasure chunks failed")]
	ObtainErasureChunks(#[from] erasure_coding::Error),
	#[error(transparent)]
	ValidationFailed(#[from] ValidationFailed),
	#[error(transparent)]
	Mpsc(#[from] mpsc::SendError),
	#[error(transparent)]
	UtilError(#[from] util::Error),
enum ValidatedCandidateCommand {
	// We were instructed to second the candidate.
	Second(BackgroundValidationResult),
	// We were instructed to validate the candidate.
	Attest(BackgroundValidationResult),
}

impl std::fmt::Debug for ValidatedCandidateCommand {
	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
		let candidate_hash = self.candidate_hash();
		match *self {
			ValidatedCandidateCommand::Second(_) =>
				write!(f, "Second({})", candidate_hash),
			ValidatedCandidateCommand::Attest(_) =>
				write!(f, "Attest({})", candidate_hash),
		}
	}
}

impl ValidatedCandidateCommand {
	fn candidate_hash(&self) -> CandidateHash {
		match *self {
			ValidatedCandidateCommand::Second(Ok((ref candidate, _, _))) => candidate.hash(),
			ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
			ValidatedCandidateCommand::Attest(Ok((ref candidate, _, _))) => candidate.hash(),
			ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
		}
	}
}

/// Holds all data needed for candidate backing job operation.
struct CandidateBackingJob {
	/// The hash of the relay parent on top of which this job is doing it's work.
	parent: Hash,
	/// Outbound message channel sending part.
	tx_from: mpsc::Sender<FromJobCommand>,
	/// The `ParaId` assigned to this validator
	/// The collator required to author the candidate, if any.
	required_collator: Option<CollatorId>,
	/// Spans for all candidates that are not yet backable.
	unbacked_candidates: HashMap<CandidateHash, jaeger::Span>,
	/// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates.
	issued_statements: HashSet<CandidateHash>,
	/// These candidates are undergoing validation in the background.
	awaiting_validation: HashSet<CandidateHash>,
	/// `Some(h)` if this job has already issued `Seconded` statement for some candidate with `h` hash.
	seconded: Option<CandidateHash>,
asynchronous rob's avatar
asynchronous rob committed
	/// The candidates that are includable, by hash. Each entry here indicates
	/// that we've sent the provisioner the backed candidate.
	backed: HashSet<CandidateHash>,
	table: Table<TableContext>,
	table_context: TableContext,
	background_validation: mpsc::Receiver<ValidatedCandidateCommand>,
	background_validation_tx: mpsc::Sender<ValidatedCandidateCommand>,
	metrics: Metrics,
}

const fn group_quorum(n_validators: usize) -> usize {
	(n_validators / 2) + 1
}

#[derive(Default)]
struct TableContext {
	signing_context: SigningContext,
	validator: Option<Validator>,
	groups: HashMap<ParaId, Vec<ValidatorIndex>>,
	validators: Vec<ValidatorId>,
}

impl TableContextTrait for TableContext {
	type AuthorityId = ValidatorIndex;
	type Digest = CandidateHash;
	type GroupId = ParaId;
	type Signature = ValidatorSignature;
	type Candidate = CommittedCandidateReceipt;

	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
		candidate.hash()
	}

	fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
		candidate.descriptor().para_id
	}

	fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
		self.groups.get(group).map_or(false, |g| g.iter().position(|a| a == authority).is_some())
	}

	fn requisite_votes(&self, group: &ParaId) -> usize {
		self.groups.get(group).map_or(usize::max_value(), |g| group_quorum(g.len()))
	}
}

// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
	let statement = match s.payload() {
		Statement::Seconded(c) => TableStatement::Seconded(c.clone()),
		Statement::Valid(h) => TableStatement::Valid(h.clone()),
	};

	TableSignedStatement {
		statement,
		signature: s.signature().clone(),
		sender: s.validator_index(),
	}
}

#[tracing::instrument(level = "trace", skip(attested, table_context), fields(subsystem = LOG_TARGET))]
asynchronous rob's avatar
asynchronous rob committed
fn table_attested_to_backed(
	attested: TableAttestedCandidate<
		ParaId,
		CommittedCandidateReceipt,
		ValidatorIndex,
		ValidatorSignature,
	>,
	table_context: &TableContext,
) -> Option<BackedCandidate> {
	let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested;

	let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) = validity_votes
asynchronous rob's avatar
asynchronous rob committed
		.into_iter()
		.map(|(id, vote)| (id, vote.into()))
		.unzip();

	let group = table_context.groups.get(&para_id)?;

	let mut validator_indices = BitVec::with_capacity(group.len());

	validator_indices.resize(group.len(), false);

	// The order of the validity votes in the backed candidate must match
	// the order of bits set in the bitfield, which is not necessarily
	// the order of the `validity_votes` we got from the table.
	let mut vote_positions = Vec::with_capacity(validity_votes.len());
	for (orig_idx, id) in ids.iter().enumerate() {
asynchronous rob's avatar
asynchronous rob committed
		if let Some(position) = group.iter().position(|x| x == id) {
			validator_indices.set(position, true);
			vote_positions.push((orig_idx, position));
		} else {
			tracing::warn!(
				target: LOG_TARGET,
				"Logic error: Validity vote from table does not correspond to group",
			);

			return None;
	vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
asynchronous rob's avatar
asynchronous rob committed

	Some(BackedCandidate {
		candidate,
		validity_votes: vote_positions.into_iter()
			.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
			.collect(),
asynchronous rob's avatar
asynchronous rob committed
		validator_indices,
	})
}

async fn store_available_data(
	tx_from: &mut mpsc::Sender<FromJobCommand>,
	id: Option<ValidatorIndex>,
	n_validators: u32,
	candidate_hash: CandidateHash,
	available_data: AvailableData,
) -> Result<(), Error> {
	let (tx, rx) = oneshot::channel();
	tx_from.send(AllMessages::AvailabilityStore(
			AvailabilityStoreMessage::StoreAvailableData(
				candidate_hash,
				id,
				n_validators,
				available_data,
				tx,
			)
		).into()
	).await?;

	let _ = rx.await.map_err(Error::StoreAvailableData)?;

	Ok(())
}

// Make a `PoV` available.
//
// This will compute the erasure root internally and compare it to the expected erasure root.
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
#[tracing::instrument(level = "trace", skip(tx_from, pov, span), fields(subsystem = LOG_TARGET))]
async fn make_pov_available(
	tx_from: &mut mpsc::Sender<FromJobCommand>,
	validator_index: Option<ValidatorIndex>,
	n_validators: usize,
	pov: Arc<PoV>,
	candidate_hash: CandidateHash,
	validation_data: polkadot_primitives::v1::PersistedValidationData,
	expected_erasure_root: Hash,
	span: Option<&jaeger::Span>,
) -> Result<Result<(), InvalidErasureRoot>, Error> {
	let available_data = AvailableData {
		pov,
		validation_data,
	};

		let _span = span.as_ref().map(|s| {
			s.child_with_candidate("erasure-coding", &candidate_hash)
		});

		let chunks = erasure_coding::obtain_chunks_v1(
			n_validators,
			&available_data,
		)?;
		let branches = erasure_coding::branches(chunks.as_ref());
		let erasure_root = branches.root();
		if erasure_root != expected_erasure_root {
			return Ok(Err(InvalidErasureRoot));
		}
		let _span = span.as_ref().map(|s|
			s.child_with_candidate("store-data", &candidate_hash)
		);

		store_available_data(
			tx_from,
			validator_index,
			n_validators as u32,
			candidate_hash,
			available_data,
		).await?;
	}

	Ok(Ok(()))
}

async fn request_pov_from_distribution(
	tx_from: &mut mpsc::Sender<FromJobCommand>,
	parent: Hash,
	descriptor: CandidateDescriptor,
) -> Result<Arc<PoV>, Error> {
	let (tx, rx) = oneshot::channel();

	tx_from.send(AllMessages::PoVDistribution(
		PoVDistributionMessage::FetchPoV(parent, descriptor, tx)
	).into()).await?;

	rx.await.map_err(Error::FetchPoV)
}

async fn request_candidate_validation(
	tx_from: &mut mpsc::Sender<FromJobCommand>,
	candidate: CandidateDescriptor,
	pov: Arc<PoV>,
) -> Result<ValidationResult, Error> {
	let (tx, rx) = oneshot::channel();

	tx_from.send(AllMessages::CandidateValidation(
			CandidateValidationMessage::ValidateFromChainState(
				candidate,
				pov,
				tx,
			)
		).into()
	).await?;

	match rx.await {
		Ok(Ok(validation_result)) => Ok(validation_result),
		Ok(Err(err)) => Err(Error::ValidationFailed(err)),
		Err(err) => Err(Error::ValidateFromChainState(err)),
	}
}

type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc<PoV>), CandidateReceipt>;

struct BackgroundValidationParams<F> {
	tx_from: mpsc::Sender<FromJobCommand>,
	tx_command: mpsc::Sender<ValidatedCandidateCommand>,
	candidate: CandidateReceipt,
	relay_parent: Hash,
	pov: Option<Arc<PoV>>,
	validator_index: Option<ValidatorIndex>,
	n_validators: usize,
	span: Option<jaeger::Span>,
	make_command: F,
}

async fn validate_and_make_available(
	params: BackgroundValidationParams<impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync>,
) -> Result<(), Error> {
	let BackgroundValidationParams {
		mut tx_from,
		mut tx_command,
		candidate,
		relay_parent,
		pov,
		validator_index,
		n_validators,
		make_command,
	} = params;

	let pov = match pov {
		Some(pov) => pov,
			let _span = span.as_ref().map(|s| s.child("request-pov"));
			request_pov_from_distribution(
				&mut tx_from,
				relay_parent,
				candidate.descriptor.clone(),
			).await?
		}
		let _span = span.as_ref().map(|s| {
			s.child_builder("request-validation")
				.with_pov(&pov)
				.with_para_id(candidate.descriptor().para_id)
				.build()
		request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?
	};

	let expected_commitments_hash = candidate.commitments_hash;

	let res = match v {
		ValidationResult::Valid(commitments, validation_data) => {
			tracing::debug!(
				target: LOG_TARGET,
				candidate_hash = ?candidate.hash(),
				"Validation successful",
			);

			// If validation produces a new set of commitments, we vote the candidate as invalid.
			if commitments.hash() != expected_commitments_hash {
					candidate_hash = ?candidate.hash(),
					actual_commitments = ?commitments,
					"Commitments obtained with validation don't match the announced by the candidate receipt",
				);
				Err(candidate)
			} else {
				let erasure_valid = make_pov_available(
					&mut tx_from,
					validator_index,
					n_validators,
					pov.clone(),
					candidate.hash(),
					validation_data,
					candidate.descriptor.erasure_root,
				).await?;

				match erasure_valid {
					Ok(()) => Ok((candidate, commitments, pov.clone())),
							candidate_hash = ?candidate.hash(),
							actual_commitments = ?commitments,
							"Erasure root doesn't match the announced by the candidate receipt",
						);
						Err(candidate)
					},
		ValidationResult::Invalid(reason) => {
				candidate_hash = ?candidate.hash(),
				reason = ?reason,
				"Validation yielded an invalid candidate",
			);
	tx_command.send(make_command(res)).await.map_err(Into::into)
impl CandidateBackingJob {
	/// Run asynchronously.
	async fn run_loop(
		mut self,
		mut rx_to: mpsc::Receiver<CandidateBackingMessage>,
		span: PerLeafSpan,
	) -> Result<(), Error> {
			futures::select! {
				validated_command = self.background_validation.next() => {
					let _span = span.child("process-validation-result");
					if let Some(c) = validated_command {
						self.handle_validated_candidate_command(&span, c).await?;
					} else {
						panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed");
					}
				}
				to_job = rx_to.next() => match to_job {
					None => break,
					Some(msg) => {
						// we intentionally want spans created in `process_msg` to descend from the
						// `span ` which is longer-lived than this ephemeral timing span.
						let _timing_span = span.child("process-message");
						self.process_msg(&span, msg).await?;
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn handle_validated_candidate_command(
		&mut self,
		parent_span: &jaeger::Span,
		command: ValidatedCandidateCommand,
	) -> Result<(), Error> {
		let candidate_hash = command.candidate_hash();
		self.awaiting_validation.remove(&candidate_hash);

		match command {
			ValidatedCandidateCommand::Second(res) => {
				match res {
					Ok((candidate, commitments, pov)) => {
						// sanity check.
						if self.seconded.is_none() && !self.issued_statements.contains(&candidate_hash) {
							self.seconded = Some(candidate_hash);
							self.issued_statements.insert(candidate_hash);
							self.metrics.on_candidate_seconded();

							let statement = Statement::Seconded(CommittedCandidateReceipt {
								descriptor: candidate.descriptor.clone(),
								commitments,
							});
							if let Some(stmt) = self.sign_import_and_distribute_statement(
								statement,
								parent_span,
							).await? {
								self.issue_candidate_seconded_message(stmt).await?;
							}
							self.distribute_pov(candidate.descriptor, pov).await?;
						}
					}
					Err(candidate) => {
						self.issue_candidate_invalid_message(candidate).await?;
					}
				}
			}
			ValidatedCandidateCommand::Attest(res) => {
				// sanity check.
				if !self.issued_statements.contains(&candidate_hash) {
					if res.is_ok() {
						let statement = Statement::Valid(candidate_hash);
						self.sign_import_and_distribute_statement(statement, &parent_span).await?;
					}
					self.issued_statements.insert(candidate_hash);
				}
			}
		}

		Ok(())
	}

	#[tracing::instrument(level = "trace", skip(self, params), fields(subsystem = LOG_TARGET))]
	async fn background_validate_and_make_available(
		&mut self,
		params: BackgroundValidationParams<
			impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync
		>,
	) -> Result<(), Error> {
		let candidate_hash = params.candidate.hash();

		if self.awaiting_validation.insert(candidate_hash) {
			// spawn background task.
			let bg = async move {
				if let Err(e) = validate_and_make_available(params).await {
					tracing::error!("Failed to validate and make available: {:?}", e);
				}
			};
			self.tx_from.send(FromJobCommand::Spawn("Backing Validation", bg.boxed())).await?;
		}

		Ok(())
	}

	async fn issue_candidate_invalid_message(
		&mut self,
		candidate: CandidateReceipt,
	) -> Result<(), Error> {
		self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Invalid(self.parent, candidate)).into()).await?;
	async fn issue_candidate_seconded_message(
		&mut self,
		statement: SignedFullStatement,
	) -> Result<(), Error> {
		self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Seconded(self.parent, statement)).into()).await?;

		Ok(())
	}

	/// Kick off background validation with intent to second.
	#[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))]
	async fn validate_and_second(
		&mut self,
		parent_span: &jaeger::Span,
		candidate: &CandidateReceipt,
	) -> Result<(), Error> {
		// Check that candidate is collated by the right collator.
		if self.required_collator.as_ref()
			.map_or(false, |c| c != &candidate.descriptor().collator)
		{
			self.issue_candidate_invalid_message(candidate.clone()).await?;
		let candidate_hash = candidate.hash();
		let mut span = self.get_unbacked_validation_child(
			root_span,
			candidate_hash,
			candidate.descriptor().para_id,
		);

		span.as_mut().map(|s| s.add_follows_from(parent_span));
		tracing::debug!(
			target: LOG_TARGET,
			candidate_hash = ?candidate_hash,
			candidate_receipt = ?candidate,
			"Validate and second candidate",
		);

		self.background_validate_and_make_available(BackgroundValidationParams {
			tx_from: self.tx_from.clone(),
			tx_command: self.background_validation_tx.clone(),
			candidate: candidate.clone(),
			relay_parent: self.parent,
			pov: Some(pov),
			validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
			n_validators: self.table_context.validators.len(),
			make_command: ValidatedCandidateCommand::Second,
		}).await?;
	async fn sign_import_and_distribute_statement(
		&mut self,
		statement: Statement,
		parent_span: &jaeger::Span,
	) -> Result<Option<SignedFullStatement>, Error> {
asynchronous rob's avatar
asynchronous rob committed
		if let Some(signed_statement) = self.sign_statement(statement).await {
			self.import_statement(&signed_statement, parent_span).await?;
			self.distribute_signed_statement(signed_statement.clone()).await?;
			Ok(Some(signed_statement))
		} else {
			Ok(None)
	/// Check if there have happened any new misbehaviors and issue necessary messages.
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
		// collect the misbehaviors to avoid double mutable self borrow issues
		let misbehaviors: Vec<_> = self.table.drain_misbehaviors().collect();
		for (validator_id, report) in misbehaviors {
			self.send_to_provisioner(
				ProvisionerMessage::ProvisionableData(
					self.parent,
					ProvisionableData::MisbehaviorReport(self.parent, validator_id, report)
				)
			).await?
		}

		Ok(())
	}

	/// Import a statement into the statement table and return the summary of the import.
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn import_statement(
		&mut self,
		statement: &SignedFullStatement,
		parent_span: &jaeger::Span,
	) -> Result<Option<TableSummary>, Error> {
		tracing::debug!(
			target: LOG_TARGET,
			statement = ?statement.payload().to_compact(),
			"Importing statement",
		);

		let import_statement_span = {
			// create a span only for candidates we're already aware of.
			let candidate_hash = statement.payload().candidate_hash();
			self.get_unbacked_statement_child(parent_span, candidate_hash, statement.validator_index())
		let stmt = primitive_statement_to_table(statement);

		let summary = self.table.import_statement(&self.table_context, stmt);

		let unbacked_span = if let Some(attested) = summary.as_ref()
			.and_then(|s| self.table.attested_candidate(&s.candidate, &self.table_context))
		{
			let candidate_hash = attested.candidate.hash();
			// `HashSet::insert` returns true if the thing wasn't in there already.
			if self.backed.insert(candidate_hash) {
				let span = self.remove_unbacked_span(&candidate_hash);

				if let Some(backed) =
					table_attested_to_backed(attested, &self.table_context)
				{
					tracing::debug!(
						target: LOG_TARGET,
						candidate_hash = ?candidate_hash,
						relay_parent = ?self.parent,
						para_id = %backed.candidate.descriptor.para_id,
					let message = ProvisionerMessage::ProvisionableData(
						self.parent,
						ProvisionableData::BackedCandidate(backed.receipt()),
					);
					self.send_to_provisioner(message).await?;

					span.as_ref().map(|s| s.child("backed"));
					span
				} else {
					None
			} else {
				None
		} else {
			None
		};
		self.issue_new_misbehaviors().await?;

		// It is important that the child span is dropped before its parent span (`unbacked_span`)
		drop(import_statement_span);
		drop(unbacked_span);

asynchronous rob's avatar
asynchronous rob committed
		Ok(summary)
	#[tracing::instrument(level = "trace", skip(self, root_span), fields(subsystem = LOG_TARGET))]
	async fn process_msg(&mut self, root_span: &jaeger::Span, msg: CandidateBackingMessage) -> Result<(), Error> {
		match msg {
			CandidateBackingMessage::Second(relay_parent, candidate, pov) => {
				let _timer = self.metrics.time_process_second();

				let span = root_span.child_builder("second")
					.with_stage(jaeger::Stage::CandidateBacking)
					.with_pov(&pov)
					.with_candidate(&candidate.hash())
					.with_relay_parent(&relay_parent)
				// Sanity check that candidate is from our assignment.
				if Some(candidate.descriptor().para_id) != self.assignment {
					return Ok(());
				}

				// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
				// Seconded statement only if we have not seconded any other candidate and
				// have not signed a Valid statement for the requested candidate.
asynchronous rob's avatar
asynchronous rob committed
				if self.seconded.is_none() {
					// This job has not seconded a candidate yet.
asynchronous rob's avatar
asynchronous rob committed
					let candidate_hash = candidate.hash();

					if !self.issued_statements.contains(&candidate_hash) {
						let pov = Arc::new(pov);
						self.validate_and_second(&span, &root_span, &candidate, pov).await?;
			CandidateBackingMessage::Statement(_relay_parent, statement) => {
				let _timer = self.metrics.time_process_statement();
				let span = root_span.child_builder("statement")
					.with_stage(jaeger::Stage::CandidateBacking)
					.with_candidate(&statement.payload().candidate_hash())
					.with_relay_parent(&_relay_parent)
					.build();
				self.check_statement_signature(&statement)?;
				match self.maybe_validate_and_import(&span, &root_span, statement).await {
					Err(Error::ValidationFailed(_)) => return Ok(()),
					Err(e) => return Err(e),
					Ok(()) => (),
				}
			CandidateBackingMessage::GetBackedCandidates(_, requested_candidates, tx) => {
				let _timer = self.metrics.time_get_backed_candidates();

				let backed = requested_candidates
					.into_iter()
					.filter_map(|hash| {
						self.table.attested_candidate(&hash, &self.table_context)
							.and_then(|attested| table_attested_to_backed(attested, &self.table_context))
					})
					.collect();
				tx.send(backed).map_err(|data| Error::Send(data))?;
			}
		}

		Ok(())
	}

	/// Kick off validation work and distribute the result as a signed statement.
	#[tracing::instrument(level = "trace", skip(self, span), fields(subsystem = LOG_TARGET))]
	async fn kick_off_validation_work(
		&mut self,
		summary: TableSummary,
		span: Option<jaeger::Span>,
	) -> Result<(), Error> {
		let candidate_hash = summary.candidate;

		if self.issued_statements.contains(&candidate_hash) {
			return Ok(())
		}

		// We clone the commitments here because there are borrowck
		// errors relating to this being a struct and methods borrowing the entirety of self
		// and not just those things that the function uses.
		let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?.to_plain();
		let descriptor = candidate.descriptor().clone();
		tracing::debug!(
			target: LOG_TARGET,
			candidate_hash = ?candidate_hash,
			candidate_receipt = ?candidate,
			"Kicking off validation",
		);

		// Check that candidate is collated by the right collator.
		if self.required_collator.as_ref()
			.map_or(false, |c| c != &descriptor.collator)
		{
			// If not, we've got the statement in the table but we will
			// not issue validation work for it.
			//
			// Act as though we've issued a statement.
			self.issued_statements.insert(candidate_hash);
			return Ok(());
		}

		self.background_validate_and_make_available(BackgroundValidationParams {
			tx_from: self.tx_from.clone(),
			tx_command: self.background_validation_tx.clone(),
			candidate,
			relay_parent: self.parent,
			pov: None,
			validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
			n_validators: self.table_context.validators.len(),
			make_command: ValidatedCandidateCommand::Attest,
		}).await
	}

	/// Import the statement and kick off validation work if it is a part of our assignment.
	#[tracing::instrument(level = "trace", skip(self, parent_span), fields(subsystem = LOG_TARGET))]
	async fn maybe_validate_and_import(
		&mut self,
		parent_span: &jaeger::Span,
		statement: SignedFullStatement,
	) -> Result<(), Error> {
		if let Some(summary) = self.import_statement(&statement, parent_span).await? {
			if let Statement::Seconded(_) = statement.payload() {
				if Some(summary.group_id) == self.assignment {
					let span = self.get_unbacked_validation_child(
						root_span,
						summary.candidate,
						summary.group_id,
					);

					self.kick_off_validation_work(summary, span).await?;
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
		let signed = self.table_context
			.validator
			.as_ref()?
			.sign(self.keystore.clone(), statement)
			.await
			.ok()
			.flatten()?;
		self.metrics.on_statement_signed();
		Some(signed)
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
		let idx = statement.validator_index().0 as usize;

		if self.table_context.validators.len() > idx {
			statement.check_signature(
				&self.table_context.signing_context,
				&self.table_context.validators[idx],
			).map_err(|_| Error::InvalidSignature)?;
		} else {
			return Err(Error::InvalidSignature);
		}

		Ok(())
	}

	/// Insert or get the unbacked-span for the given candidate hash.
	fn insert_or_get_unbacked_span(
		&mut self,
		parent_span: &jaeger::Span,
		hash: CandidateHash,
		para_id: Option<ParaId>
	) -> Option<&jaeger::Span> {
		if !self.backed.contains(&hash) {
			// only add if we don't consider this backed.
			let span = self.unbacked_candidates.entry(hash).or_insert_with(|| {
				let s = parent_span.child_builder("unbacked-candidate").with_candidate(&hash);
				let s = if let Some(para_id) = para_id {
					s.with_para_id(para_id)
				} else {
					s
				};

				s.build()
			Some(span)
		} else {
			None
	fn get_unbacked_validation_child(
		&mut self,
		parent_span: &jaeger::Span,
		hash: CandidateHash,
		para_id: ParaId,
	) -> Option<jaeger::Span> {
		self.insert_or_get_unbacked_span(parent_span, hash, Some(para_id))
			.map(|span| {
				span.child_builder("validation")
					.with_candidate(&hash)
					.with_stage(Stage::CandidateBacking)
					.build()
			})
	fn get_unbacked_statement_child(
		&mut self,
		parent_span: &jaeger::Span,
		hash: CandidateHash,
		validator: ValidatorIndex,
	) -> Option<jaeger::Span> {
		self.insert_or_get_unbacked_span(parent_span, hash, None).map(|span| {
			span.child_builder("import-statement")
				.with_candidate(&hash)
				.with_validator_index(validator)
				.build()
	fn remove_unbacked_span(&mut self, hash: &CandidateHash) -> Option<jaeger::Span> {
		self.unbacked_candidates.remove(hash)
	async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
		self.tx_from.send(AllMessages::from(msg).into()).await?;
	async fn distribute_pov(
		&mut self,
		descriptor: CandidateDescriptor,
		pov: Arc<PoV>,
	) -> Result<(), Error> {
		self.tx_from.send(AllMessages::from(
			PoVDistributionMessage::DistributePoV(self.parent, descriptor, pov),
		).into()).await.map_err(Into::into)