Unverified Commit 48caed5e authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Candidate backing respects scheduled collator (#1613)

* extract collator assignment from cores

* check required collator

* test and fix checks
parent c39fbd38
Pipeline #104272 passed with stages
in 26 minutes and 21 seconds
......@@ -32,7 +32,7 @@ use polkadot_primitives::v1::{
CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
ValidatorIndex, SigningContext, PoV,
CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
CandidateCommitments, CoreState, CoreIndex,
CandidateCommitments, CoreState, CoreIndex, CollatorId,
};
use polkadot_node_primitives::{
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport,
......@@ -91,8 +91,10 @@ struct CandidateBackingJob {
rx_to: mpsc::Receiver<ToJob>,
/// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJob>,
/// The `ParaId`s assigned to this validator.
/// The `ParaId` assigned to this validator
assignment: ParaId,
/// The collator required to author the candidate, if any.
required_collator: Option<CollatorId>,
/// We issued `Valid` or `Invalid` statements on about these candidates.
issued_statements: HashSet<Hash>,
/// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
......@@ -268,6 +270,14 @@ impl CandidateBackingJob {
candidate: &CandidateReceipt,
pov: PoV,
) -> Result<bool, Error> {
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &candidate.descriptor().collator)
{
self.issue_candidate_invalid_message(candidate.clone()).await?;
return Ok(false);
}
let valid = self.request_candidate_validation(
candidate.descriptor().clone(),
Arc::new(pov.clone()),
......@@ -479,6 +489,19 @@ impl CandidateBackingJob {
let expected_commitments = candidate.commitments.clone();
let descriptor = candidate.descriptor().clone();
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &descriptor.collator)
{
// If not, we've got the statement in the table but we will
// not issue validation work for it.
//
// Act as though we've issued a statement.
self.issued_statements.insert(candidate_hash);
return Ok(());
}
let pov = self.request_pov_from_distribution(descriptor.clone()).await?;
let v = self.request_candidate_validation(descriptor, pov.clone()).await?;
......@@ -721,34 +744,43 @@ impl util::JobTrait for CandidateBackingJob {
let cores = try_runtime_api!(cores);
let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = Validator::construct(&validators, signing_context, keystore.clone())?;
let validator = match Validator::construct(
&validators,
signing_context,
keystore.clone(),
) {
Ok(v) => v,
Err(util::Error::NotAValidator) => { return Ok(()) },
Err(e) => {
log::warn!(
target: "candidate_backing",
"Cannot participate in candidate backing: {:?}",
e
);
return Ok(())
}
};
let mut groups = HashMap::new();
let n_cores = cores.len();
let mut assignment = None;
for (idx, core) in cores.into_iter().enumerate() {
// Ignore prospective assignments on occupied cores for the time being.
if let CoreState::Scheduled(scheduled) = core {
let core_index = CoreIndex(idx as _);
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
if let Some(g) = validator_groups.get(group_index.0 as usize) {
if g.contains(&validator.index()) {
assignment = Some((scheduled.para_id, scheduled.collator));
}
groups.insert(scheduled.para_id, g.clone());
}
}
}
let mut assignment = Default::default();
if let Some(idx) = validators.iter().position(|k| *k == validator.id()) {
let idx = idx as u32;
for (para_id, group) in groups.iter() {
if group.contains(&idx) {
assignment = *para_id;
break;
}
}
}
let table_context = TableContext {
groups,
validators,
......@@ -756,11 +788,17 @@ impl util::JobTrait for CandidateBackingJob {
validator: Some(validator),
};
let (assignment, required_collator) = match assignment {
None => return Ok(()), // no need to work.
Some((a, r)) => (a, r),
};
let job = CandidateBackingJob {
parent,
rx_to,
tx_from,
assignment,
required_collator,
issued_statements: HashSet::new(),
seconded: None,
reported_misbehavior_for: HashSet::new(),
......@@ -829,7 +867,7 @@ mod tests {
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use polkadot_primitives::v1::{
ScheduledCore, BlockData, CandidateCommitments, CollatorId,
ScheduledCore, BlockData, CandidateCommitments,
PersistedValidationData, ValidationData, TransientValidationData, HeadData,
ValidatorPair, ValidityAttestation, GroupRotationInfo,
};
......@@ -1041,15 +1079,15 @@ mod tests {
}
);
// Check that subsystem job issues a request for the availability cores.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx))
) if parent == test_state.relay_parent => {
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
}
);
// Check that subsystem job issues a request for the availability cores.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx))
) if parent == test_state.relay_parent => {
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
}
);
}
// Test that a `CandidateBackingMessage::Second` issues validation work
......@@ -1747,4 +1785,108 @@ mod tests {
assert_eq!(rx.await.unwrap().len(), 0);
});
}
// Test that a `CandidateBackingMessage::Second` issues validation work
// and in case validation is successful issues a `StatementDistributionMessage`.
#[test]
fn backing_doesnt_second_wrong_collator() {
let mut test_state = TestState::default();
test_state.availability_cores[0] = CoreState::Scheduled(ScheduledCore {
para_id: ParaId::from(1),
collator: Some(Sr25519Keyring::Bob.public().into()),
});
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
block_data: BlockData(vec![42, 43, 44]),
};
let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
let pov_hash = pov.hash();
let candidate = TestCandidateBuilder {
para_id: test_state.chain_ids[0],
relay_parent: test_state.relay_parent,
pov_hash,
head_data: expected_head_data.clone(),
erasure_root: make_erasure_root(&test_state, pov.clone()),
..Default::default()
}.build();
let second = CandidateBackingMessage::Second(
test_state.relay_parent,
candidate.to_plain(),
pov.clone(),
);
virtual_overseer.send(FromOverseer::Communication{ msg: second }).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::CandidateSelection(
CandidateSelectionMessage::Invalid(parent, c)
) if parent == test_state.relay_parent && c == candidate.to_plain() => {
}
);
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
});
}
#[test]
fn validation_work_ignores_wrong_collator() {
let mut test_state = TestState::default();
test_state.availability_cores[0] = CoreState::Scheduled(ScheduledCore {
para_id: ParaId::from(1),
collator: Some(Sr25519Keyring::Bob.public().into()),
});
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
block_data: BlockData(vec![1, 2, 3]),
};
let pov_hash = pov.hash();
let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();
let candidate_a = TestCandidateBuilder {
para_id: test_state.chain_ids[0],
relay_parent: test_state.relay_parent,
pov_hash,
head_data: expected_head_data.clone(),
erasure_root: make_erasure_root(&test_state, pov.clone()),
..Default::default()
}.build();
let seconding = SignedFullStatement::sign(
Statement::Seconded(candidate_a.clone()),
&test_state.signing_context,
2,
&test_state.validators[2].pair().into(),
);
let statement = CandidateBackingMessage::Statement(
test_state.relay_parent,
seconding.clone(),
);
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
// The statement will be ignored because it has the wrong collator.
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
});
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment