lib.rs 83.5 KB
Newer Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements a `CandidateBackingSubsystem`.

#![deny(unused_crate_dependencies)]

use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;

use bitvec::vec::BitVec;
use futures::{channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_primitives::v1::{
	AvailableData, BackedCandidate, CandidateCommitments, CandidateDescriptor, CandidateHash,
	CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId,
	PoV, SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
};
use polkadot_node_primitives::{
	Statement, SignedFullStatement, ValidationResult,
};
use polkadot_subsystem::{
	PerLeafSpan, Stage, jaeger,
	messages::{
		AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage,
		CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage,
		ProvisionableData, ProvisionerMessage, RuntimeApiRequest,
		StatementDistributionMessage, ValidationFailed
	}
};
use polkadot_node_subsystem_util::{
	self as util,
	request_session_index_for_child,
	request_validator_groups,
	request_validators,
	request_from_runtime,
	Validator,
	delegated_subsystem,
	FromJobCommand,
	metrics::{self, prometheus},
};
use statement_table::{
	generic::AttestedCandidate as TableAttestedCandidate,
	Context as TableContextTrait,
	Table,
	v1::{
		Statement as TableStatement,
use thiserror::Error;
const LOG_TARGET: &str = "parachain::candidate-backing";
#[derive(Debug, Error)]
enum Error {
	#[error("Candidate is not found")]
	CandidateNotFound,
	#[error("Signature is invalid")]
	InvalidSignature,
	#[error("Failed to send candidates {0:?}")]
	#[error("FetchPoV failed")]
	FetchPoV,
	#[error("ValidateFromChainState channel closed before receipt")]
	ValidateFromChainState(#[source] oneshot::Canceled),
	#[error("StoreAvailableData channel closed before receipt")]
	StoreAvailableData(#[source] oneshot::Canceled),
	#[error("a channel was closed before receipt in try_join!")]
	JoinMultiple(#[source] oneshot::Canceled),
	#[error("Obtaining erasure chunks failed")]
	ObtainErasureChunks(#[from] erasure_coding::Error),
	#[error(transparent)]
	ValidationFailed(#[from] ValidationFailed),
	#[error(transparent)]
	Mpsc(#[from] mpsc::SendError),
	#[error(transparent)]
	UtilError(#[from] util::Error),
/// PoV data to validate.
enum PoVData {
	/// Allready available (from candidate selection).
	Ready(Arc<PoV>),
	/// Needs to be fetched from validator (we are checking a signed statement).
	FetchFromValidator {
		from_validator: ValidatorIndex,
		candidate_hash: CandidateHash,
		pov_hash: Hash,
	},
}

enum ValidatedCandidateCommand {
	// We were instructed to second the candidate that has been already validated.
	Second(BackgroundValidationResult),
	// We were instructed to validate the candidate.
	Attest(BackgroundValidationResult),
	// We were not able to `Attest` because backing validator did not send us the PoV.
	AttestNoPoV(CandidateHash),
}

impl std::fmt::Debug for ValidatedCandidateCommand {
	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
		let candidate_hash = self.candidate_hash();
		match *self {
			ValidatedCandidateCommand::Second(_) =>
				write!(f, "Second({})", candidate_hash),
			ValidatedCandidateCommand::Attest(_) =>
				write!(f, "Attest({})", candidate_hash),
			ValidatedCandidateCommand::AttestNoPoV(_) =>
				write!(f, "Attest({})", candidate_hash),
		}
	}
}

impl ValidatedCandidateCommand {
	fn candidate_hash(&self) -> CandidateHash {
		match *self {
			ValidatedCandidateCommand::Second(Ok((ref candidate, _, _))) => candidate.hash(),
			ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
			ValidatedCandidateCommand::Attest(Ok((ref candidate, _, _))) => candidate.hash(),
			ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
			ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
/// Holds all data needed for candidate backing job operation.
struct CandidateBackingJob {
	/// The hash of the relay parent on top of which this job is doing it's work.
	parent: Hash,
	/// Outbound message channel sending part.
	tx_from: mpsc::Sender<FromJobCommand>,
	/// The `ParaId` assigned to this validator
	/// The collator required to author the candidate, if any.
	required_collator: Option<CollatorId>,
	/// Spans for all candidates that are not yet backable.
	unbacked_candidates: HashMap<CandidateHash, jaeger::Span>,
	/// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates.
	issued_statements: HashSet<CandidateHash>,
	/// These candidates are undergoing validation in the background.
	awaiting_validation: HashSet<CandidateHash>,
	/// Data needed for retrying in case of `ValidatedCandidateCommand::AttestNoPoV`.
	fallbacks: HashMap<CandidateHash, (AttestingData, Option<jaeger::Span>)>,
	/// `Some(h)` if this job has already issued `Seconded` statement for some candidate with `h` hash.
	seconded: Option<CandidateHash>,
asynchronous rob's avatar
asynchronous rob committed
	/// The candidates that are includable, by hash. Each entry here indicates
	/// that we've sent the provisioner the backed candidate.
	backed: HashSet<CandidateHash>,
	table: Table<TableContext>,
	table_context: TableContext,
	background_validation: mpsc::Receiver<ValidatedCandidateCommand>,
	background_validation_tx: mpsc::Sender<ValidatedCandidateCommand>,
	metrics: Metrics,
/// In case a backing validator does not provide a PoV, we need to retry with other backing
/// validators.
///
/// This is the data needed to accomplish this. Basically all the data needed for spawning a
/// validation job and a list of backing validators, we can try.
#[derive(Clone)]
struct AttestingData {
	/// The candidate to attest.
	candidate: CandidateReceipt,
	/// Hash of the PoV we need to fetch.
	pov_hash: Hash,
	/// Validator we are currently trying to get the PoV from.
	from_validator: ValidatorIndex,
	/// Other backing validators we can try in case `from_validator` failed.
	backing: Vec<ValidatorIndex>,
}

const fn group_quorum(n_validators: usize) -> usize {
	(n_validators / 2) + 1
}

#[derive(Default)]
struct TableContext {
	signing_context: SigningContext,
	validator: Option<Validator>,
	groups: HashMap<ParaId, Vec<ValidatorIndex>>,
	validators: Vec<ValidatorId>,
}

impl TableContextTrait for TableContext {
	type AuthorityId = ValidatorIndex;
	type Digest = CandidateHash;
	type GroupId = ParaId;
	type Signature = ValidatorSignature;
	type Candidate = CommittedCandidateReceipt;

	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
		candidate.hash()
	}

	fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
		candidate.descriptor().para_id
	}

	fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
		self.groups.get(group).map_or(false, |g| g.iter().position(|a| a == authority).is_some())
	}

	fn requisite_votes(&self, group: &ParaId) -> usize {
		self.groups.get(group).map_or(usize::max_value(), |g| group_quorum(g.len()))
	}
}

// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
	let statement = match s.payload() {
		Statement::Seconded(c) => TableStatement::Seconded(c.clone()),
		Statement::Valid(h) => TableStatement::Valid(h.clone()),
	};

	TableSignedStatement {
		statement,
		signature: s.signature().clone(),
		sender: s.validator_index(),
	}
}

#[tracing::instrument(level = "trace", skip(attested, table_context), fields(subsystem = LOG_TARGET))]
asynchronous rob's avatar
asynchronous rob committed
fn table_attested_to_backed(
	attested: TableAttestedCandidate<
		ParaId,
		CommittedCandidateReceipt,
		ValidatorIndex,
		ValidatorSignature,
	>,
	table_context: &TableContext,
) -> Option<BackedCandidate> {
	let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested;

	let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) = validity_votes
asynchronous rob's avatar
asynchronous rob committed
		.into_iter()
		.map(|(id, vote)| (id, vote.into()))
		.unzip();

	let group = table_context.groups.get(&para_id)?;

	let mut validator_indices = BitVec::with_capacity(group.len());

	validator_indices.resize(group.len(), false);

	// The order of the validity votes in the backed candidate must match
	// the order of bits set in the bitfield, which is not necessarily
	// the order of the `validity_votes` we got from the table.
	let mut vote_positions = Vec::with_capacity(validity_votes.len());
	for (orig_idx, id) in ids.iter().enumerate() {
asynchronous rob's avatar
asynchronous rob committed
		if let Some(position) = group.iter().position(|x| x == id) {
			validator_indices.set(position, true);
			vote_positions.push((orig_idx, position));
		} else {
			tracing::warn!(
				target: LOG_TARGET,
				"Logic error: Validity vote from table does not correspond to group",
			);

			return None;
	vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
asynchronous rob's avatar
asynchronous rob committed

	Some(BackedCandidate {
		candidate,
		validity_votes: vote_positions.into_iter()
			.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
			.collect(),
asynchronous rob's avatar
asynchronous rob committed
		validator_indices,
	})
}

async fn store_available_data(
	tx_from: &mut mpsc::Sender<FromJobCommand>,
	id: Option<ValidatorIndex>,
	n_validators: u32,
	candidate_hash: CandidateHash,
	available_data: AvailableData,
) -> Result<(), Error> {
	let (tx, rx) = oneshot::channel();
	tx_from.send(AllMessages::AvailabilityStore(
			AvailabilityStoreMessage::StoreAvailableData(
				candidate_hash,
				id,
				n_validators,
				available_data,
				tx,
			)
		).into()
	).await?;

	let _ = rx.await.map_err(Error::StoreAvailableData)?;

	Ok(())
}

// Make a `PoV` available.
//
// This will compute the erasure root internally and compare it to the expected erasure root.
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
#[tracing::instrument(level = "trace", skip(tx_from, pov, span), fields(subsystem = LOG_TARGET))]
async fn make_pov_available(
	tx_from: &mut mpsc::Sender<FromJobCommand>,
	validator_index: Option<ValidatorIndex>,
	n_validators: usize,
	pov: Arc<PoV>,
	candidate_hash: CandidateHash,
	validation_data: polkadot_primitives::v1::PersistedValidationData,
	expected_erasure_root: Hash,
	span: Option<&jaeger::Span>,
) -> Result<Result<(), InvalidErasureRoot>, Error> {
	let available_data = AvailableData {
		pov,
		validation_data,
	};

			s.child("erasure-coding").with_candidate(candidate_hash)

		let chunks = erasure_coding::obtain_chunks_v1(
			n_validators,
			&available_data,
		)?;
		let branches = erasure_coding::branches(chunks.as_ref());
		let erasure_root = branches.root();
		if erasure_root != expected_erasure_root {
			return Ok(Err(InvalidErasureRoot));
		}
			s.child("store-data").with_candidate(candidate_hash)
		store_available_data(
			tx_from,
			validator_index,
			n_validators as u32,
			candidate_hash,
			available_data,
		).await?;
	}
async fn request_pov(
	tx_from: &mut mpsc::Sender<FromJobCommand>,
	relay_parent: Hash,
	from_validator: ValidatorIndex,
	candidate_hash: CandidateHash,
	pov_hash: Hash,
) -> Result<Arc<PoV>, Error> {

	let (tx, rx) = oneshot::channel();
	tx_from.send(FromJobCommand::SendMessage(AllMessages::AvailabilityDistribution(
		AvailabilityDistributionMessage::FetchPoV {
			relay_parent,
			from_validator,
			candidate_hash,
			pov_hash,
			tx,
		}
	))).await?;
	let pov = rx.await.map_err(|_| Error::FetchPoV)?;
	Ok(Arc::new(pov))
}

async fn request_candidate_validation(
	tx_from: &mut mpsc::Sender<FromJobCommand>,
	candidate: CandidateDescriptor,
	pov: Arc<PoV>,
) -> Result<ValidationResult, Error> {
	let (tx, rx) = oneshot::channel();

	tx_from.send(AllMessages::CandidateValidation(
			CandidateValidationMessage::ValidateFromChainState(
				candidate,
				pov,
				tx,
			)
		).into()
	).await?;

	match rx.await {
		Ok(Ok(validation_result)) => Ok(validation_result),
		Ok(Err(err)) => Err(Error::ValidationFailed(err)),
		Err(err) => Err(Error::ValidateFromChainState(err)),
	}
}

type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc<PoV>), CandidateReceipt>;

struct BackgroundValidationParams<F> {
	tx_from: mpsc::Sender<FromJobCommand>,
	tx_command: mpsc::Sender<ValidatedCandidateCommand>,
	candidate: CandidateReceipt,
	relay_parent: Hash,
	pov: PoVData,
	validator_index: Option<ValidatorIndex>,
	n_validators: usize,
	span: Option<jaeger::Span>,
	make_command: F,
}

async fn validate_and_make_available(
	params: BackgroundValidationParams<impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync>,
) -> Result<(), Error> {
	let BackgroundValidationParams {
		mut tx_from,
		mut tx_command,
		candidate,
		relay_parent,
		pov,
		validator_index,
		n_validators,
		make_command,
	} = params;

	let pov = match pov {
		PoVData::Ready(pov) => pov,
		PoVData::FetchFromValidator {
			from_validator,
			candidate_hash,
			pov_hash,
		} => {
			let _span = span.as_ref().map(|s| s.child("request-pov"));
			match request_pov(
					&mut tx_from,
					relay_parent,
					from_validator,
					candidate_hash,
					pov_hash,
				).await {
				Err(Error::FetchPoV) => {
					tx_command.send(ValidatedCandidateCommand::AttestNoPoV(candidate.hash())).await.map_err(Error::Mpsc)?;
					return Ok(())
				}
				Err(err) => return Err(err),
				Ok(pov) => pov,
			}
		let _span = span.as_ref().map(|s| {
			s.child("request-validation")
				.with_pov(&pov)
				.with_para_id(candidate.descriptor().para_id)
		request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?
	};

	let expected_commitments_hash = candidate.commitments_hash;

	let res = match v {
		ValidationResult::Valid(commitments, validation_data) => {
			tracing::debug!(
				target: LOG_TARGET,
				candidate_hash = ?candidate.hash(),
				"Validation successful",
			);

			// If validation produces a new set of commitments, we vote the candidate as invalid.
			if commitments.hash() != expected_commitments_hash {
					candidate_hash = ?candidate.hash(),
					actual_commitments = ?commitments,
					"Commitments obtained with validation don't match the announced by the candidate receipt",
				);
				Err(candidate)
			} else {
				let erasure_valid = make_pov_available(
					&mut tx_from,
					validator_index,
					n_validators,
					pov.clone(),
					candidate.hash(),
					validation_data,
					candidate.descriptor.erasure_root,
				).await?;

				match erasure_valid {
					Ok(()) => Ok((candidate, commitments, pov.clone())),
							candidate_hash = ?candidate.hash(),
							actual_commitments = ?commitments,
							"Erasure root doesn't match the announced by the candidate receipt",
						);
						Err(candidate)
					},
		ValidationResult::Invalid(reason) => {
				candidate_hash = ?candidate.hash(),
				reason = ?reason,
				"Validation yielded an invalid candidate",
			);
	tx_command.send(make_command(res)).await.map_err(Into::into)
impl CandidateBackingJob {
	/// Run asynchronously.
	async fn run_loop(
		mut self,
		mut rx_to: mpsc::Receiver<CandidateBackingMessage>,
		span: PerLeafSpan,
	) -> Result<(), Error> {
			futures::select! {
				validated_command = self.background_validation.next() => {
					let _span = span.child("process-validation-result");
					if let Some(c) = validated_command {
						self.handle_validated_candidate_command(&span, c).await?;
					} else {
						panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed");
					}
				}
				to_job = rx_to.next() => match to_job {
					None => break,
					Some(msg) => {
						// we intentionally want spans created in `process_msg` to descend from the
						// `span ` which is longer-lived than this ephemeral timing span.
						let _timing_span = span.child("process-message");
						self.process_msg(&span, msg).await?;
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn handle_validated_candidate_command(
		&mut self,
		parent_span: &jaeger::Span,
		command: ValidatedCandidateCommand,
	) -> Result<(), Error> {
		let candidate_hash = command.candidate_hash();
		self.awaiting_validation.remove(&candidate_hash);

		match command {
			ValidatedCandidateCommand::Second(res) => {
				match res {
					Ok((candidate, commitments, _)) => {
						// sanity check.
						if self.seconded.is_none() && !self.issued_statements.contains(&candidate_hash) {
							self.seconded = Some(candidate_hash);
							self.issued_statements.insert(candidate_hash);
							self.metrics.on_candidate_seconded();

							let statement = Statement::Seconded(CommittedCandidateReceipt {
								descriptor: candidate.descriptor.clone(),
								commitments,
							});
							if let Some(stmt) = self.sign_import_and_distribute_statement(
								statement,
								parent_span,
							).await? {
								self.issue_candidate_seconded_message(stmt).await?;
							}
						}
					}
					Err(candidate) => {
						self.issue_candidate_invalid_message(candidate).await?;
					}
				}
			}
			ValidatedCandidateCommand::Attest(res) => {
				// We are done - avoid new validation spawns:
				self.fallbacks.remove(&candidate_hash);
				// sanity check.
				if !self.issued_statements.contains(&candidate_hash) {
					if res.is_ok() {
						let statement = Statement::Valid(candidate_hash);
						self.sign_import_and_distribute_statement(statement, &parent_span).await?;
					}
					self.issued_statements.insert(candidate_hash);
				}
			}
			ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
				if let Some((attesting, span)) = self.fallbacks.get_mut(&candidate_hash) {
					if let Some(index) = attesting.backing.pop() {
						attesting.from_validator = index;
						// Ok, another try:
						let c_span = span.as_ref().map(|s| s.child("try"));
						let attesting = attesting.clone();
						self.kick_off_validation_work(attesting, c_span).await?
					}

				} else {
					tracing::warn!(
						target: LOG_TARGET,
						"AttestNoPoV was triggered without fallback being available."
					);
					debug_assert!(false);
				}
			}
		}

		Ok(())
	}

	#[tracing::instrument(level = "trace", skip(self, params), fields(subsystem = LOG_TARGET))]
	async fn background_validate_and_make_available(
		&mut self,
		params: BackgroundValidationParams<
			impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync
		>,
	) -> Result<(), Error> {
		let candidate_hash = params.candidate.hash();
		if self.awaiting_validation.insert(candidate_hash) {
			// spawn background task.
			let bg = async move {
				if let Err(e) = validate_and_make_available(params).await {
					tracing::error!(target: LOG_TARGET, "Failed to validate and make available: {:?}", e);
				}
			};
			self.tx_from.send(FromJobCommand::Spawn("Backing Validation", bg.boxed())).await?;
		}

		Ok(())
	}

	async fn issue_candidate_invalid_message(
		&mut self,
		candidate: CandidateReceipt,
	) -> Result<(), Error> {
		self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Invalid(self.parent, candidate)).into()).await?;
	async fn issue_candidate_seconded_message(
		&mut self,
		statement: SignedFullStatement,
	) -> Result<(), Error> {
		self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Seconded(self.parent, statement)).into()).await?;

		Ok(())
	}

	/// Kick off background validation with intent to second.
	#[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))]
	async fn validate_and_second(
		&mut self,
		parent_span: &jaeger::Span,
		candidate: &CandidateReceipt,
	) -> Result<(), Error> {
		// Check that candidate is collated by the right collator.
		if self.required_collator.as_ref()
			.map_or(false, |c| c != &candidate.descriptor().collator)
		{
			self.issue_candidate_invalid_message(candidate.clone()).await?;
		let candidate_hash = candidate.hash();
		let mut span = self.get_unbacked_validation_child(
			root_span,
			candidate_hash,
			candidate.descriptor().para_id,
		);

		span.as_mut().map(|span| span.add_follows_from(parent_span));
		tracing::debug!(
			target: LOG_TARGET,
			candidate_hash = ?candidate_hash,
			candidate_receipt = ?candidate,
			"Validate and second candidate",
		);

		self.background_validate_and_make_available(BackgroundValidationParams {
			tx_from: self.tx_from.clone(),
			tx_command: self.background_validation_tx.clone(),
			candidate: candidate.clone(),
			relay_parent: self.parent,
			pov: PoVData::Ready(pov),
			validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
			n_validators: self.table_context.validators.len(),
			make_command: ValidatedCandidateCommand::Second,
		}).await?;
	async fn sign_import_and_distribute_statement(
		&mut self,
		statement: Statement,
		parent_span: &jaeger::Span,
	) -> Result<Option<SignedFullStatement>, Error> {
asynchronous rob's avatar
asynchronous rob committed
		if let Some(signed_statement) = self.sign_statement(statement).await {
			self.import_statement(&signed_statement, parent_span).await?;
			self.distribute_signed_statement(signed_statement.clone()).await?;
			Ok(Some(signed_statement))
		} else {
			Ok(None)
	/// Check if there have happened any new misbehaviors and issue necessary messages.
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
		// collect the misbehaviors to avoid double mutable self borrow issues
		let misbehaviors: Vec<_> = self.table.drain_misbehaviors().collect();
		for (validator_id, report) in misbehaviors {
			self.send_to_provisioner(
				ProvisionerMessage::ProvisionableData(
					self.parent,
					ProvisionableData::MisbehaviorReport(self.parent, validator_id, report)
				)
			).await?
		}

		Ok(())
	}

	/// Import a statement into the statement table and return the summary of the import.
	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	async fn import_statement(
		&mut self,
		statement: &SignedFullStatement,
		parent_span: &jaeger::Span,
	) -> Result<Option<TableSummary>, Error> {
		tracing::debug!(
			target: LOG_TARGET,
			statement = ?statement.payload().to_compact(),
			"Importing statement",
		);

		let import_statement_span = {
			// create a span only for candidates we're already aware of.
			let candidate_hash = statement.payload().candidate_hash();
			self.get_unbacked_statement_child(parent_span, candidate_hash, statement.validator_index())
		let stmt = primitive_statement_to_table(statement);

		let summary = self.table.import_statement(&self.table_context, stmt);

		let unbacked_span = if let Some(attested) = summary.as_ref()
			.and_then(|s| self.table.attested_candidate(&s.candidate, &self.table_context))
		{
			let candidate_hash = attested.candidate.hash();
			// `HashSet::insert` returns true if the thing wasn't in there already.
			if self.backed.insert(candidate_hash) {
				let span = self.remove_unbacked_span(&candidate_hash);

				if let Some(backed) =
					table_attested_to_backed(attested, &self.table_context)
				{
					tracing::debug!(
						target: LOG_TARGET,
						candidate_hash = ?candidate_hash,
						relay_parent = ?self.parent,
						para_id = %backed.candidate.descriptor.para_id,
					let message = ProvisionerMessage::ProvisionableData(
						self.parent,
						ProvisionableData::BackedCandidate(backed.receipt()),
					);
					self.send_to_provisioner(message).await?;

					span.as_ref().map(|s| s.child("backed"));
					span
				} else {
					None
			} else {
				None
		} else {
			None
		};
		self.issue_new_misbehaviors().await?;

		// It is important that the child span is dropped before its parent span (`unbacked_span`)
		drop(import_statement_span);
		drop(unbacked_span);

asynchronous rob's avatar
asynchronous rob committed
		Ok(summary)
	#[tracing::instrument(level = "trace", skip(self, root_span), fields(subsystem = LOG_TARGET))]
	async fn process_msg(&mut self, root_span: &jaeger::Span, msg: CandidateBackingMessage) -> Result<(), Error> {
		match msg {
			CandidateBackingMessage::Second(relay_parent, candidate, pov) => {
				let _timer = self.metrics.time_process_second();

				let span = root_span.child("second")
					.with_stage(jaeger::Stage::CandidateBacking)
					.with_pov(&pov)
					.with_candidate(candidate.hash())
					.with_relay_parent(relay_parent);
				// Sanity check that candidate is from our assignment.
				if Some(candidate.descriptor().para_id) != self.assignment {
					return Ok(());
				}

				// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
				// Seconded statement only if we have not seconded any other candidate and
				// have not signed a Valid statement for the requested candidate.
asynchronous rob's avatar
asynchronous rob committed
				if self.seconded.is_none() {
					// This job has not seconded a candidate yet.
asynchronous rob's avatar
asynchronous rob committed
					let candidate_hash = candidate.hash();

					if !self.issued_statements.contains(&candidate_hash) {
						let pov = Arc::new(pov);
						self.validate_and_second(&span, &root_span, &candidate, pov).await?;
			CandidateBackingMessage::Statement(_relay_parent, statement) => {
				let _timer = self.metrics.time_process_statement();
				let span = root_span.child("statement")
					.with_stage(jaeger::Stage::CandidateBacking)
					.with_candidate(statement.payload().candidate_hash())
					.with_relay_parent(_relay_parent);
				self.check_statement_signature(&statement)?;
				match self.maybe_validate_and_import(&span, &root_span, statement).await {
					Err(Error::ValidationFailed(_)) => return Ok(()),
					Err(e) => return Err(e),
					Ok(()) => (),
				}
			CandidateBackingMessage::GetBackedCandidates(_, requested_candidates, tx) => {
				let _timer = self.metrics.time_get_backed_candidates();

				let backed = requested_candidates
					.into_iter()
					.filter_map(|hash| {
						self.table.attested_candidate(&hash, &self.table_context)
							.and_then(|attested| table_attested_to_backed(attested, &self.table_context))
					})
					.collect();
				tx.send(backed).map_err(|data| Error::Send(data))?;
			}
		}

		Ok(())
	}

	/// Kick off validation work and distribute the result as a signed statement.
	#[tracing::instrument(level = "trace", skip(self, attesting, span), fields(subsystem = LOG_TARGET))]
	async fn kick_off_validation_work(
		&mut self,
		attesting: AttestingData,
		span: Option<jaeger::Span>,
	) -> Result<(), Error> {
		let candidate_hash = attesting.candidate.hash();
		if self.issued_statements.contains(&candidate_hash) {
			return Ok(())
		}

		let descriptor = attesting.candidate.descriptor().clone();
		tracing::debug!(
			target: LOG_TARGET,
			candidate_hash = ?candidate_hash,
			candidate_receipt = ?attesting.candidate,
		// Check that candidate is collated by the right collator.
		if self.required_collator.as_ref()
			.map_or(false, |c| c != &descriptor.collator)
		{
			// If not, we've got the statement in the table but we will
			// not issue validation work for it.
			//
			// Act as though we've issued a statement.
			self.issued_statements.insert(candidate_hash);
			return Ok(());
		}

		let pov = PoVData::FetchFromValidator {
			from_validator: attesting.from_validator,
			candidate_hash,
			pov_hash: attesting.pov_hash,
		};

		self.background_validate_and_make_available(BackgroundValidationParams {
			tx_from: self.tx_from.clone(),
			tx_command: self.background_validation_tx.clone(),
			candidate: attesting.candidate,
			relay_parent: self.parent,
			validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
			n_validators: self.table_context.validators.len(),
			make_command: ValidatedCandidateCommand::Attest,
		}).await
	}

	/// Import the statement and kick off validation work if it is a part of our assignment.
	#[tracing::instrument(level = "trace", skip(self, parent_span), fields(subsystem = LOG_TARGET))]
	async fn maybe_validate_and_import(
		&mut self,
		parent_span: &jaeger::Span,
		statement: SignedFullStatement,
	) -> Result<(), Error> {
		if let Some(summary) = self.import_statement(&statement, parent_span).await? {
			if Some(summary.group_id) != self.assignment {
				return Ok(())
			}
			let (attesting, span) = match statement.payload() {
				Statement::Seconded(receipt) => {
					let candidate_hash = summary.candidate;

					let span = self.get_unbacked_validation_child(
						root_span,
						summary.candidate,
						summary.group_id,
					);
					let attesting = AttestingData {
						candidate: self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?.to_plain(),
						pov_hash: receipt.descriptor.pov_hash,
						from_validator: statement.validator_index(),
						backing: Vec::new(),
					};
					let child = span.as_ref().map(|s| s.child("try"));
					self.fallbacks.insert(summary.candidate, (attesting.clone(), span));
					(attesting, child)
				Statement::Valid(candidate_hash) => {
					if let Some((attesting, span)) = self.fallbacks.get_mut(candidate_hash) {

						let our_index = self.table_context.validator.as_ref().map(|v| v.index());
						if our_index == Some(statement.validator_index()) {
							return Ok(())
						}

						if self.awaiting_validation.contains(candidate_hash) {
							// Job already running:
							attesting.backing.push(statement.validator_index());
							return Ok(())
						} else {
							// No job, so start another try with current validator:
							attesting.from_validator = statement.validator_index();
							(attesting.clone(), span.as_ref().map(|s| s.child("try")))
						}
					} else {
						return Ok(())
					}
				}
			};