Unverified Commit 113ae827 authored by Peter Goodspeed-Niklaus's avatar Peter Goodspeed-Niklaus Committed by GitHub
Browse files

do not store backed candidates in the provisioner (#1909)

* guide: non-semantic changes

* guide: update per the issue description

* GetBackedCandidates operates on multiple hashes now

* GetBackedCandidates still needs a relay parent

* implement changes specified in guide

* distinguish between various occasions for canceled oneshots

* add tracing info to getbackedcandidates

* REVERT ME: add tracing messages for GetBackedCandidates

Note that these messages are only sometimes actually passed on to the
candidate backing subsystem, with the consequence that it is
unexpectedly frequent that the provisioner fails to create its
provisionable data.

* REVERT ME: more tracing logging

* REVERT ME: log when CandidateBackingJob receives any message at all

* REVERT ME: log when send_msg sends a message to a job

* fix candidate-backing tests

* streamline GetBackedCandidates

This uses table.attested_candidate instead of table.get_candidate, because
it's not obvious how to get a BackedCandidate from just a
CommittedCandidateReceipt.

* REVERT ME: more logging tracing job lifespans

* promote warning about job premature demise

* don't terminate CandiateBackingJob::run_loop in event of failure to process message

* Revert "REVERT ME: more logging tracing job lifespans"

This reverts commit 7365f2fb.

* Revert "REVERT ME: log when send_msg sends a message to a job"

This reverts commit 58e46aad.

* Revert "REVERT ME: log when CandidateBackingJob receives any message at all"

This reverts commit 0d6f3841.

* Revert "REVERT ME: more tracing logging"

This reverts commit 675fd262.

* Revert "REVERT ME: add tracing messages for GetBackedCandidates"

This reverts commit e09e1564.

* formatting

* add logging message to CandidateBackingJob::run_loop start

* REVERT ME: add tracing to candidate-backing job creation

* run candidatebacking loop even if no assignment

* use unique error variants for each canceled oneshot

* Revert "REVERT ME: add tracing to candidate-backing job creation"

This reverts commit 8ce5f4f0.

* try_runtime_api more to reduce silent exits

* add sanity check that returned backed candidates preserve ordering

* remove redundant err attribute
parent f4aae884
Pipeline #116111 passed with stages
in 18 minutes and 34 seconds
......@@ -39,7 +39,7 @@ use polkadot_node_primitives::{
use polkadot_subsystem::{
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
CandidateValidationMessage, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, StatementDistributionMessage, ValidationFailed, RuntimeApiRequest,
},
};
......@@ -74,11 +74,17 @@ enum Error {
#[error("Signature is invalid")]
InvalidSignature,
#[error("Failed to send candidates {0:?}")]
Send(Vec<NewBackedCandidate>),
#[error("Oneshot never resolved")]
Oneshot(#[from] #[source] oneshot::Canceled),
Send(Vec<BackedCandidate>),
#[error("FetchPoV channel closed before receipt")]
FetchPoV(#[source] oneshot::Canceled),
#[error("ValidateFromChainState channel closed before receipt")]
ValidateFromChainState(#[source] oneshot::Canceled),
#[error("StoreAvailableData channel closed before receipt")]
StoreAvailableData(#[source] oneshot::Canceled),
#[error("a channel was closed before receipt in try_join!")]
JoinMultiple(#[source] oneshot::Canceled),
#[error("Obtaining erasure chunks failed")]
ObtainErasureChunks(#[from] #[source] erasure_coding::Error),
ObtainErasureChunks(#[from] erasure_coding::Error),
#[error(transparent)]
ValidationFailed(#[from] ValidationFailed),
#[error(transparent)]
......@@ -124,7 +130,7 @@ struct CandidateBackingJob {
/// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJobCommand>,
/// The `ParaId` assigned to this validator
assignment: ParaId,
assignment: Option<ParaId>,
/// The collator required to author the candidate, if any.
required_collator: Option<CollatorId>,
/// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates.
......@@ -270,7 +276,7 @@ async fn store_available_data(
).into()
).await?;
let _ = rx.await?;
let _ = rx.await.map_err(Error::StoreAvailableData)?;
Ok(())
}
......@@ -328,7 +334,7 @@ async fn request_pov_from_distribution(
PoVDistributionMessage::FetchPoV(parent, descriptor, tx)
).into()).await?;
Ok(rx.await?)
rx.await.map_err(Error::FetchPoV)
}
async fn request_candidate_validation(
......@@ -347,7 +353,11 @@ async fn request_candidate_validation(
).into()
).await?;
Ok(rx.await??)
match rx.await {
Ok(Ok(validation_result)) => Ok(validation_result),
Ok(Err(err)) => Err(Error::ValidationFailed(err)),
Err(err) => Err(Error::ValidateFromChainState(err)),
}
}
type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc<PoV>), CandidateReceipt>;
......@@ -567,21 +577,6 @@ impl CandidateBackingJob {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn get_backed(&self) -> Vec<NewBackedCandidate> {
let proposed = self.table.proposed_candidates(&self.table_context);
let mut res = Vec::with_capacity(proposed.len());
for p in proposed.into_iter() {
match table_attested_to_backed(p, &self.table_context) {
None => continue,
Some(backed) => res.push(NewBackedCandidate(backed)),
}
}
res
}
/// Check if there have happened any new misbehaviors and issue necessary messages.
///
/// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387)
......@@ -641,7 +636,7 @@ impl CandidateBackingJob {
{
let message = ProvisionerMessage::ProvisionableData(
self.parent,
ProvisionableData::BackedCandidate(backed),
ProvisionableData::BackedCandidate(backed.receipt()),
);
self.send_to_provisioner(message).await?;
}
......@@ -661,7 +656,7 @@ impl CandidateBackingJob {
let _timer = self.metrics.time_process_second();
// Sanity check that candidate is from our assignment.
if candidate.descriptor().para_id != self.assignment {
if Some(candidate.descriptor().para_id) != self.assignment {
return Ok(());
}
......@@ -688,10 +683,16 @@ impl CandidateBackingJob {
Ok(()) => (),
}
}
CandidateBackingMessage::GetBackedCandidates(_, tx) => {
CandidateBackingMessage::GetBackedCandidates(_, requested_candidates, tx) => {
let _timer = self.metrics.time_get_backed_candidates();
let backed = self.get_backed();
let backed = requested_candidates
.into_iter()
.filter_map(|hash| {
self.table.attested_candidate(&hash, &self.table_context)
.and_then(|attested| table_attested_to_backed(attested, &self.table_context))
})
.collect();
tx.send(backed).map_err(|data| Error::Send(data))?;
}
......@@ -750,7 +751,7 @@ impl CandidateBackingJob {
) -> Result<(), Error> {
if let Some(summary) = self.import_statement(&statement).await? {
if let Statement::Seconded(_) = statement.payload() {
if summary.group_id == self.assignment {
if Some(summary.group_id) == self.assignment {
self.kick_off_validation_work(summary).await?;
}
}
......@@ -850,15 +851,15 @@ impl util::JobTrait for CandidateBackingJob {
}
let (validators, groups, session_index, cores) = futures::try_join!(
request_validators(parent, &mut tx_from).await?,
request_validator_groups(parent, &mut tx_from).await?,
request_session_index_for_child(parent, &mut tx_from).await?,
request_from_runtime(
try_runtime_api!(request_validators(parent, &mut tx_from).await),
try_runtime_api!(request_validator_groups(parent, &mut tx_from).await),
try_runtime_api!(request_session_index_for_child(parent, &mut tx_from).await),
try_runtime_api!(request_from_runtime(
parent,
&mut tx_from,
|tx| RuntimeApiRequest::AvailabilityCores(tx),
).await?,
)?;
).await),
).map_err(Error::JoinMultiple)?;
let validators = try_runtime_api!(validators);
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
......@@ -911,8 +912,8 @@ impl util::JobTrait for CandidateBackingJob {
};
let (assignment, required_collator) = match assignment {
None => return Ok(()), // no need to work.
Some(r) => r,
None => (None, None),
Some((assignment, required_collator)) => (Some(assignment), required_collator),
};
let (background_tx, background_rx) = mpsc::channel(16);
......@@ -1492,22 +1493,10 @@ mod tests {
AllMessages::Provisioner(
ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::BackedCandidate(BackedCandidate {
candidate,
validity_votes,
validator_indices,
})
ProvisionableData::BackedCandidate(candidate_receipt)
)
) if candidate == candidate_a => {
assert_eq!(validity_votes.len(), 3);
assert!(validity_votes.contains(
&ValidityAttestation::Explicit(signed_b.signature().clone())
));
assert!(validity_votes.contains(
&ValidityAttestation::Implicit(signed_a.signature().clone())
));
assert_eq!(validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0, 1]);
) => {
assert_eq!(candidate_receipt, candidate_a.to_plain());
}
);
......@@ -2190,6 +2179,7 @@ mod tests {
let (tx, rx) = oneshot::channel();
let msg = CandidateBackingMessage::GetBackedCandidates(
test_state.relay_parent,
vec![candidate.hash()],
tx,
);
......
......@@ -26,14 +26,17 @@ use futures::{
};
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
messages::{ChainApiMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, AllMessages},
messages::{
AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData,
ProvisionerMessage,
},
};
use polkadot_node_subsystem_util::{
self as util, delegated_subsystem, FromJobCommand,
request_availability_cores, request_persisted_validation_data, JobTrait, metrics::{self, prometheus},
};
use polkadot_primitives::v1::{
BackedCandidate, BlockNumber, CoreState, Hash, OccupiedCoreAssumption,
BackedCandidate, BlockNumber, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption,
SignedAvailabilityBitfield, ValidatorIndex,
};
use std::{pin::Pin, collections::BTreeMap};
......@@ -82,7 +85,7 @@ struct ProvisioningJob {
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<ProvisionerMessage>,
provisionable_data_channels: Vec<mpsc::Sender<ProvisionableData>>,
backed_candidates: Vec<BackedCandidate>,
backed_candidates: Vec<CandidateReceipt>,
signed_bitfields: Vec<SignedAvailabilityBitfield>,
metrics: Metrics,
inherent_after: InherentAfter,
......@@ -94,8 +97,17 @@ enum Error {
#[error(transparent)]
Util(#[from] util::Error),
#[error(transparent)]
OneshotRecv(#[from] oneshot::Canceled),
#[error("failed to get availability cores")]
CanceledAvailabilityCores(#[source] oneshot::Canceled),
#[error("failed to get persisted validation data")]
CanceledPersistedValidationData(#[source] oneshot::Canceled),
#[error("failed to get block number")]
CanceledBlockNumber(#[source] oneshot::Canceled),
#[error("failed to get backed candidates")]
CanceledBackedCandidates(#[source] oneshot::Canceled),
#[error(transparent)]
ChainApi(#[from] ChainApiError),
......@@ -103,11 +115,17 @@ enum Error {
#[error(transparent)]
Runtime(#[from] RuntimeApiError),
#[error("Failed to send message to ChainAPI")]
#[error("failed to send message to ChainAPI")]
ChainApiMessageSend(#[source] mpsc::SendError),
#[error("Failed to send return message with Inherents")]
#[error("failed to send message to CandidateBacking to get backed candidates")]
GetBackedCandidatesSend(#[source] mpsc::SendError),
#[error("failed to send return message with Inherents")]
InherentDataReturnChannel,
#[error("backed candidate does not correspond to selected candidate; check logic in provisioner")]
BackedCandidateOrderingProblem,
}
impl JobTrait for ProvisioningJob {
......@@ -291,13 +309,13 @@ type CoreAvailability = BitVec<bitvec::order::Lsb0, u8>;
async fn send_inherent_data(
relay_parent: Hash,
bitfields: &[SignedAvailabilityBitfield],
candidates: &[BackedCandidate],
candidates: &[CandidateReceipt],
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
from_job: &mut mpsc::Sender<FromJobCommand>,
) -> Result<(), Error> {
let availability_cores = request_availability_cores(relay_parent, from_job)
.await?
.await??;
.await.map_err(|err| Error::CanceledAvailabilityCores(err))??;
let bitfields = select_availability_bitfields(&availability_cores, bitfields);
let candidates = select_candidates(
......@@ -363,7 +381,7 @@ fn select_availability_bitfields(
async fn select_candidates(
availability_cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
candidates: &[BackedCandidate],
candidates: &[CandidateReceipt],
relay_parent: Hash,
sender: &mut mpsc::Sender<FromJobCommand>,
) -> Result<Vec<BackedCandidate>, Error> {
......@@ -403,7 +421,7 @@ async fn select_candidates(
sender,
)
.await?
.await??
.await.map_err(|err| Error::CanceledPersistedValidationData(err))??
{
Some(v) => v,
None => continue,
......@@ -413,15 +431,40 @@ async fn select_candidates(
// we arbitrarily pick the first of the backed candidates which match the appropriate selection criteria
if let Some(candidate) = candidates.iter().find(|backed_candidate| {
let descriptor = &backed_candidate.candidate.descriptor;
let descriptor = &backed_candidate.descriptor;
descriptor.para_id == scheduled_core.para_id
&& descriptor.persisted_validation_data_hash == computed_validation_data_hash
}) {
selected_candidates.push(candidate.clone());
selected_candidates.push(candidate.hash());
}
}
// now get the backed candidates corresponding to these candidate receipts
let (tx, rx) = oneshot::channel();
sender.send(AllMessages::CandidateBacking(CandidateBackingMessage::GetBackedCandidates(
relay_parent,
selected_candidates.clone(),
tx,
)).into()).await.map_err(|err| Error::GetBackedCandidatesSend(err))?;
let candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
// `selected_candidates` is generated in ascending order by core index, and `GetBackedCandidates`
// _should_ preserve that property, but let's just make sure.
//
// We can't easily map from `BackedCandidate` to `core_idx`, but we know that every selected candidate
// maps to either 0 or 1 backed candidate, and the hashes correspond. Therefore, by checking them
// in order, we can ensure that the backed candidates are also in order.
let mut backed_idx = 0;
for selected in selected_candidates.iter() {
if *selected == candidates.get(backed_idx).ok_or(Error::BackedCandidateOrderingProblem)?.hash() {
backed_idx += 1;
}
}
if candidates.len() != backed_idx {
Err(Error::BackedCandidateOrderingProblem)?;
}
Ok(selected_candidates)
Ok(candidates)
}
/// Produces a block number 1 higher than that of the relay parent
......@@ -439,7 +482,7 @@ async fn get_block_number_under_construction(
)).into())
.await
.map_err(|e| Error::ChainApiMessageSend(e))?;
match rx.await? {
match rx.await.map_err(|err| Error::CanceledBlockNumber(err))? {
Ok(Some(n)) => Ok(n + 1),
Ok(None) => Ok(0),
Err(err) => Err(err.into()),
......
......@@ -610,7 +610,10 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) {
if let Entry::Occupied(mut job) = self.running.entry(parent_hash) {
if job.get_mut().send_msg(msg).await.is_err() {
tracing::debug!(job = Job::NAME, "failed to send message to job, will remove it");
tracing::warn!(
job = Job::NAME,
relay_parent = ?parent_hash,
"failed to send message to job, will remove it");
job.remove();
}
}
......
......@@ -47,10 +47,6 @@ pub trait BoundToRelayParent {
fn relay_parent(&self) -> Hash;
}
/// A notification of a new backed candidate.
#[derive(Debug)]
pub struct NewBackedCandidate(pub BackedCandidate);
/// Messages received by the Candidate Selection subsystem.
#[derive(Debug)]
pub enum CandidateSelectionMessage {
......@@ -81,7 +77,7 @@ impl Default for CandidateSelectionMessage {
pub enum CandidateBackingMessage {
/// Requests a set of backable candidates that could be backed in a child of the given
/// relay-parent, referenced by its hash.
GetBackedCandidates(Hash, oneshot::Sender<Vec<NewBackedCandidate>>),
GetBackedCandidates(Hash, Vec<CandidateHash>, oneshot::Sender<Vec<BackedCandidate>>),
/// Note that the Candidate Backing subsystem should second the given candidate in the context of the
/// given relay-parent (ref. by hash). This candidate must be validated.
Second(Hash, CandidateReceipt, PoV),
......@@ -93,7 +89,7 @@ pub enum CandidateBackingMessage {
impl BoundToRelayParent for CandidateBackingMessage {
fn relay_parent(&self) -> Hash {
match self {
Self::GetBackedCandidates(hash, _) => *hash,
Self::GetBackedCandidates(hash, _, _) => *hash,
Self::Second(hash, _, _) => *hash,
Self::Statement(hash, _) => *hash,
}
......@@ -497,7 +493,7 @@ pub enum ProvisionableData {
/// This bitfield indicates the availability of various candidate blocks.
Bitfield(Hash, SignedAvailabilityBitfield),
/// The Candidate Backing subsystem believes that this candidate is valid, pending availability.
BackedCandidate(BackedCandidate),
BackedCandidate(CandidateReceipt),
/// Misbehavior reports are self-contained proofs of validator misbehavior.
MisbehaviorReport(Hash, MisbehaviorReport),
/// Disputes trigger a broad dispute resolution process.
......
......@@ -416,6 +416,16 @@ impl<H> BackedCandidate<H> {
pub fn descriptor(&self) -> &CandidateDescriptor<H> {
&self.candidate.descriptor
}
/// Compute this candidate's hash.
pub fn hash(&self) -> CandidateHash where H: Clone + Encode {
self.candidate.hash()
}
/// Get this candidate's receipt.
pub fn receipt(&self) -> CandidateReceipt<H> where H: Clone {
self.candidate.to_plain()
}
}
/// Verify the backing of the given candidate.
......
......@@ -67,7 +67,7 @@ The goal of a Candidate Backing Job is to produce as many backable candidates as
```rust
match msg {
CetBackedCandidates(hash, tx) => {
GetBackedCandidates(hashes, tx) => {
// Send back a set of backable candidates.
}
CandidateBackingMessage::Second(hash, candidate) => {
......@@ -88,7 +88,7 @@ match msg {
}
```
Add `Seconded` statements and `Valid` statements to a quorum. If quorum reaches validator-group majority, send a [`ProvisionerMessage`][PM]`::ProvisionableData(ProvisionableData::BackedCandidate(BackedCandidate))` message.
Add `Seconded` statements and `Valid` statements to a quorum. If quorum reaches validator-group majority, send a [`ProvisionerMessage`][PM]`::ProvisionableData(ProvisionableData::BackedCandidate(CandidateReceipt))` message.
`Invalid` statements that conflict with already witnessed `Seconded` and `Valid` statements for the given candidate, statements that are double-votes, self-contradictions and so on, should result in issuing a [`ProvisionerMessage`][PM]`::MisbehaviorReport` message for each newly detected case of this kind.
### Validating Candidates.
......
......@@ -211,7 +211,7 @@ enum BitfieldSigningMessage { }
enum CandidateBackingMessage {
/// Requests a set of backable candidates that could be backed in a child of the given
/// relay-parent, referenced by its hash.
GetBackedCandidates(Hash, ResponseChannel<Vec<NewBackedCandidate>>),
GetBackedCandidates(Hash, Vec<CandidateHash>, ResponseChannel<Vec<BackedCandidate>>),
/// Note that the Candidate Backing subsystem should second the given candidate in the context of the
/// given relay-parent (ref. by hash). This candidate must be validated using the provided PoV.
/// The PoV is expected to match the `pov_hash` in the descriptor.
......@@ -384,7 +384,7 @@ enum ProvisionableData {
/// This bitfield indicates the availability of various candidate blocks.
Bitfield(Hash, SignedAvailabilityBitfield),
/// The Candidate Backing subsystem believes that this candidate is valid, pending availability.
BackedCandidate(BackedCandidate),
BackedCandidate(CandidateReceipt),
/// Misbehavior reports are self-contained proofs of validator misbehavior.
MisbehaviorReport(Hash, MisbehaviorReport),
/// Disputes trigger a broad dispute resolution process.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment