mod.rs 37.1 KB
Newer Older
			})
			.await;

			if !receive_availability.await.map_err(Error::Oneshot)? {
				// If the data is not available, we disregard the dispute votes.
				// This is an indication that the dispute does not correspond to any included
				// candidate and that it should be ignored.
				//
				// We expect that if the candidate is truly disputed that the higher-level network
				// code will retry.

				tracing::debug!(
					target: LOG_TARGET,
					"Recovering availability failed - invalid import."
				);
				return Ok(ImportStatementsResult::InvalidImport)
			metrics.on_open();

			if concluded_valid {
				metrics.on_concluded_valid();
			}
			if concluded_invalid {
				metrics.on_concluded_invalid();
			}
		}

		// Only write when updated and vote is available.
		overlay_db.write_recent_disputes(recent_disputes);
	}

	overlay_db.write_candidate_votes(session, candidate_hash, votes.into());

	Ok(ImportStatementsResult::ValidImport)
fn find_controlled_validator_indices(
	keystore: &LocalKeystore,
	validators: &[ValidatorId],
) -> HashSet<ValidatorIndex> {
	let mut controlled = HashSet::new();
	for (index, validator) in validators.iter().enumerate() {
		if keystore.key_pair::<ValidatorPair>(validator).ok().flatten().is_none() {
			continue
		}

		controlled.insert(ValidatorIndex(index as _));
	}

	controlled
}

async fn issue_local_statement(
	ctx: &mut impl SubsystemContext,
	overlay_db: &mut OverlayedBackend<'_, impl Backend>,
	state: &mut State,
	candidate_hash: CandidateHash,
	candidate_receipt: CandidateReceipt,
	session: SessionIndex,
	valid: bool,
	now: Timestamp,
	metrics: &Metrics,
) -> Result<(), Error> {
	// Load session info.
	let info = match state.rolling_session_window.session_info(session) {
		None => {
			tracing::warn!(
				target: LOG_TARGET,
				session,
				"Missing info for session which has an active dispute",
			);

			return Ok(())
		},
		Some(info) => info,
	};

	let validators = info.validators.clone();

	let votes = overlay_db
		.load_candidate_votes(session, &candidate_hash)?
		.map(CandidateVotes::from)
		.unwrap_or_else(|| CandidateVotes {
			candidate_receipt: candidate_receipt.clone(),
			valid: Vec::new(),
			invalid: Vec::new(),
		});

	// Sign a statement for each validator index we control which has
	// not already voted. This should generally be maximum 1 statement.
	let voted_indices = votes.voted_indices();
	let mut statements = Vec::new();

	let voted_indices: HashSet<_> = voted_indices.into_iter().collect();
	let controlled_indices = find_controlled_validator_indices(&state.keystore, &validators[..]);
	for index in controlled_indices {
		if voted_indices.contains(&index) {
			continue
		}

		let keystore = state.keystore.clone() as Arc<_>;
		let res = SignedDisputeStatement::sign_explicit(
			&keystore,
			valid,
			candidate_hash,
			session,
			validators[index.0 as usize].clone(),
		)
		.await;

		match res {
			Ok(Some(signed_dispute_statement)) => {
				statements.push((signed_dispute_statement, index));
			},
			Ok(None) => {},
			Err(e) => {
				tracing::error!(
					target: LOG_TARGET,
					err = ?e,
					"Encountered keystore error while signing dispute statement",
				);
			},
		}
	}

	// Get our message out:
	for (statement, index) in &statements {
		let dispute_message = match make_dispute_message(info, &votes, statement.clone(), *index) {
			Err(err) => {
				tracing::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed.");
				continue
			},
			Ok(dispute_message) => dispute_message,
		};

		ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await;
	}

	// Do import
	if !statements.is_empty() {
		match handle_import_statements(
			ctx,
			overlay_db,
			state,
			candidate_hash,
			MaybeCandidateReceipt::Provides(candidate_receipt),
			Err(_) => {
				tracing::error!(
					target: LOG_TARGET,
					?candidate_hash,
					?session,
					"pending confirmation receiver got dropped by `handle_import_statements` for our own votes!"
				);
			},
			Ok(ImportStatementsResult::InvalidImport) => {
				tracing::error!(
					target: LOG_TARGET,
					?candidate_hash,
					?session,
					"`handle_import_statements` considers our own votes invalid!"
				);
			},
			Ok(ImportStatementsResult::ValidImport) => {
				tracing::trace!(
					target: LOG_TARGET,
					?candidate_hash,
					?session,
					"`handle_import_statements` successfully imported our vote!"
	}

	Ok(())
}

#[derive(Debug, thiserror::Error)]
enum DisputeMessageCreationError {
	#[error("There was no opposite vote available")]
	NoOppositeVote,
	#[error("Found vote had an invalid validator index that could not be found")]
	InvalidValidatorIndex,
	#[error("Statement found in votes had invalid signature.")]
	InvalidStoredStatement,
	#[error(transparent)]
	InvalidStatementCombination(DisputeMessageCheckError),
}

fn make_dispute_message(
	info: &SessionInfo,
	votes: &CandidateVotes,
	our_vote: SignedDisputeStatement,
	our_index: ValidatorIndex,
) -> Result<DisputeMessage, DisputeMessageCreationError> {
	let validators = &info.validators;

	let (valid_statement, valid_index, invalid_statement, invalid_index) =
		if let DisputeStatement::Valid(_) = our_vote.statement() {
			let (statement_kind, validator_index, validator_signature) =
				votes.invalid.get(0).ok_or(DisputeMessageCreationError::NoOppositeVote)?.clone();
			let other_vote = SignedDisputeStatement::new_checked(
				DisputeStatement::Invalid(statement_kind),
				our_vote.candidate_hash().clone(),
				our_vote.session_index(),
				validators
					.get(validator_index.0 as usize)
					.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
					.clone(),
				validator_signature,
			)
			.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
			(our_vote, our_index, other_vote, validator_index)
		} else {
			let (statement_kind, validator_index, validator_signature) =
				votes.valid.get(0).ok_or(DisputeMessageCreationError::NoOppositeVote)?.clone();
			let other_vote = SignedDisputeStatement::new_checked(
				DisputeStatement::Valid(statement_kind),
				our_vote.candidate_hash().clone(),
				our_vote.session_index(),
				validators
					.get(validator_index.0 as usize)
					.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
					.clone(),
				validator_signature,
			)
			.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
			(other_vote, validator_index, our_vote, our_index)
		};

	DisputeMessage::from_signed_statements(
		valid_statement,
		valid_index,
		invalid_statement,
		invalid_index,
		votes.candidate_receipt.clone(),
		info,
	)
	.map_err(DisputeMessageCreationError::InvalidStatementCombination)
}

/// Determine the the best block and its block number.
/// Assumes `block_descriptions` are sorted from the one
/// with the lowest `BlockNumber` to the highest.
fn determine_undisputed_chain(
	overlay_db: &mut OverlayedBackend<'_, impl Backend>,
	base_number: BlockNumber,
	base_hash: Hash,
	block_descriptions: Vec<BlockDescription>,
) -> Result<(BlockNumber, Hash), Error> {
	let last = block_descriptions
		.last()
		.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.block_hash))
		.unwrap_or((base_number, base_hash));

	// Fast path for no disputes.
	let recent_disputes = match overlay_db.load_recent_disputes()? {
		None => return Ok(last),
		Some(a) if a.is_empty() => return Ok(last),
		Some(a) => a,
	};

	let is_possibly_invalid = |session, candidate_hash| {
		recent_disputes
			.get(&(session, candidate_hash))
			.map_or(false, |status| status.is_possibly_invalid())
	};

	for (i, BlockDescription { session, candidates, .. }) in block_descriptions.iter().enumerate() {
		if candidates.iter().any(|c| is_possibly_invalid(*session, *c)) {
			if i == 0 {
				return Ok((base_number, base_hash))
				return Ok((base_number + i as BlockNumber, block_descriptions[i - 1].block_hash))