Unverified Commit 1364ee69 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Backing fixes (#1897)



* Commit my changes

* some backing fixes

* indentation

* fix backing tests

* tweak includability rules

* comment

* Update node/core/backing/src/lib.rs
Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Update node/core/backing/src/lib.rs
Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Update node/core/backing/src/lib.rs
Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Update node/core/backing/src/lib.rs
Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Bastian Köcher's avatarBastian Köcher <git@kchr.de>
Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
parent b8fc1acb
Pipeline #112979 passed with stages
in 25 minutes and 34 seconds
......@@ -104,6 +104,9 @@ struct CandidateBackingJob {
issued_statements: HashSet<Hash>,
/// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
seconded: Option<Hash>,
/// The candidates that are includable, by hash. Each entry here indicates
/// that we've sent the provisioner the backed candidate.
backed: HashSet<Hash>,
/// We have already reported misbehaviors for these validators.
reported_misbehavior_for: HashSet<ValidatorIndex>,
keystore: SyncCryptoStorePtr,
......@@ -242,6 +245,41 @@ fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement
}
}
fn table_attested_to_backed(
attested: TableAttestedCandidate<
ParaId,
CommittedCandidateReceipt,
ValidatorIndex,
ValidatorSignature,
>,
table_context: &TableContext,
) -> Option<BackedCandidate> {
let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested;
let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes
.into_iter()
.map(|(id, vote)| (id, vote.into()))
.unzip();
let group = table_context.groups.get(&para_id)?;
let mut validator_indices = BitVec::with_capacity(group.len());
validator_indices.resize(group.len(), false);
for id in ids.iter() {
if let Some(position) = group.iter().position(|x| x == id) {
validator_indices.set(position, true);
}
}
Some(BackedCandidate {
candidate,
validity_votes,
validator_indices,
})
}
impl CandidateBackingJob {
/// Run asynchronously.
async fn run_loop(mut self) -> Result<(), Error> {
......@@ -337,49 +375,30 @@ impl CandidateBackingJob {
let issued_statement = statement.is_some();
if let Some(statement) = statement {
if let Some(signed_statement) = self.sign_statement(statement).await {
self.import_statement(&signed_statement).await?;
self.distribute_signed_statement(signed_statement).await?;
}
self.sign_import_and_distribute_statement(statement).await?
}
Ok(issued_statement)
}
async fn sign_import_and_distribute_statement(&mut self, statement: Statement) -> Result<(), Error> {
if let Some(signed_statement) = self.sign_statement(statement).await {
self.import_statement(&signed_statement).await?;
self.distribute_signed_statement(signed_statement).await?;
}
Ok(())
}
fn get_backed(&self) -> Vec<NewBackedCandidate> {
let proposed = self.table.proposed_candidates(&self.table_context);
let mut res = Vec::with_capacity(proposed.len());
for p in proposed.into_iter() {
let TableAttestedCandidate { candidate, validity_votes, .. } = p;
let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes
.into_iter()
.map(|(id, vote)| (id, vote.into()))
.unzip();
let group = match self.table_context.groups.get(&self.assignment) {
Some(group) => group,
match table_attested_to_backed(p, &self.table_context) {
None => continue,
};
let mut validator_indices = BitVec::with_capacity(group.len());
validator_indices.resize(group.len(), false);
for id in ids.iter() {
if let Some(position) = group.iter().position(|x| x == id) {
validator_indices.set(position, true);
}
Some(backed) => res.push(NewBackedCandidate(backed)),
}
let backed = BackedCandidate {
candidate,
validity_votes,
validator_indices,
};
res.push(NewBackedCandidate(backed.clone()));
}
res
......@@ -428,9 +447,29 @@ impl CandidateBackingJob {
let summary = self.table.import_statement(&self.table_context, stmt);
if let Some(ref summary) = summary {
if let Some(attested) = self.table.attested_candidate(
&summary.candidate,
&self.table_context,
) {
// `HashSet::insert` returns true if the thing wasn't in there already.
// one of the few places the Rust-std folks did a bad job with API
if self.backed.insert(summary.candidate) {
if let Some(backed) =
table_attested_to_backed(attested, &self.table_context)
{
let message = ProvisionerMessage::ProvisionableData(
ProvisionableData::BackedCandidate(backed),
);
self.send_to_provisioner(message).await?;
}
}
}
}
self.issue_new_misbehaviors().await?;
return Ok(summary);
Ok(summary)
}
async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
......@@ -444,23 +483,19 @@ impl CandidateBackingJob {
// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
// Seconded statement only if we have not seconded any other candidate and
// have not signed a Valid statement for the requested candidate.
match self.seconded {
if self.seconded.is_none() {
// This job has not seconded a candidate yet.
None => {
let candidate_hash = candidate.hash();
if !self.issued_statements.contains(&candidate_hash) {
if let Ok(true) = self.validate_and_second(
&candidate,
pov,
).await {
self.metrics.on_candidate_seconded();
self.seconded = Some(candidate_hash);
}
let candidate_hash = candidate.hash();
if !self.issued_statements.contains(&candidate_hash) {
if let Ok(true) = self.validate_and_second(
&candidate,
pov,
).await {
self.metrics.on_candidate_seconded();
self.seconded = Some(candidate_hash);
}
}
// This job has already seconded a candidate.
Some(_) => {}
}
}
CandidateBackingMessage::Statement(_, statement) => {
......@@ -541,11 +576,7 @@ impl CandidateBackingJob {
self.issued_statements.insert(candidate_hash);
if let Some(signed_statement) = self.sign_statement(statement).await {
self.distribute_signed_statement(signed_statement).await?;
}
Ok(())
self.sign_import_and_distribute_statement(statement).await
}
/// Import the statement and kick off validation work if it is a part of our assignment.
......@@ -818,6 +849,7 @@ impl util::JobTrait for CandidateBackingJob {
required_collator,
issued_statements: HashSet::new(),
seconded: None,
backed: HashSet::new(),
reported_misbehavior_for: HashSet::new(),
keystore,
table: Table::default(),
......@@ -930,6 +962,7 @@ mod tests {
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Ferdie,
Sr25519Keyring::One,
];
let keystore = Arc::new(sc_keystore::LocalKeystore::in_memory());
......@@ -939,7 +972,7 @@ mod tests {
let validator_public = validator_pubkeys(&validators);
let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]];
let validator_groups = vec![vec![2, 0, 3, 5], vec![1], vec![4]];
let group_rotation_info = GroupRotationInfo {
session_start_block: 0,
group_rotation_frequency: 100,
......@@ -1225,12 +1258,20 @@ mod tests {
let candidate_a_hash = candidate_a.hash();
let public0 = CryptoStore::sr25519_generate_new(
&*test_state.keystore,
ValidatorId::ID, Some(&test_state.validators[0].to_seed())
ValidatorId::ID,
Some(&test_state.validators[0].to_seed()),
).await.expect("Insert key into keystore");
let public1 = CryptoStore::sr25519_generate_new(
&*test_state.keystore,
ValidatorId::ID,
Some(&test_state.validators[5].to_seed()),
).await.expect("Insert key into keystore");
let public2 = CryptoStore::sr25519_generate_new(
&*test_state.keystore,
ValidatorId::ID, Some(&test_state.validators[2].to_seed())
ValidatorId::ID,
Some(&test_state.validators[2].to_seed()),
).await.expect("Insert key into keystore");
let signed_a = SignedFullStatement::sign(
&test_state.keystore,
Statement::Seconded(candidate_a.clone()),
......@@ -1243,8 +1284,8 @@ mod tests {
&test_state.keystore,
Statement::Valid(candidate_a_hash),
&test_state.signing_context,
0,
&public0.into(),
5,
&public1.into(),
).await.expect("should be signed");
let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone());
......@@ -1301,28 +1342,38 @@ mod tests {
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
let (tx, rx) = oneshot::channel();
// The backed candidats set should be not empty at this point.
virtual_overseer.send(FromOverseer::Communication{
msg: CandidateBackingMessage::GetBackedCandidates(
test_state.relay_parent,
tx,
)
}).await;
let backed = rx.await.unwrap();
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::Share(hash, stmt)
) => {
assert_eq!(test_state.relay_parent, hash);
stmt.check_signature(&test_state.signing_context, &public0.into()).expect("Is signed correctly");
}
);
// `validity_votes` may be in any order so we can't do this in a single assert.
assert_eq!(backed[0].0.candidate, candidate_a);
assert_eq!(backed[0].0.validity_votes.len(), 2);
assert!(backed[0].0.validity_votes.contains(
&ValidityAttestation::Explicit(signed_b.signature().clone())
));
assert!(backed[0].0.validity_votes.contains(
&ValidityAttestation::Implicit(signed_a.signature().clone())
));
assert_eq!(backed[0].0.validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0]);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::Provisioner(
ProvisionerMessage::ProvisionableData(
ProvisionableData::BackedCandidate(BackedCandidate {
candidate,
validity_votes,
validator_indices,
})
)
) if candidate == candidate_a => {
assert_eq!(validity_votes.len(), 3);
assert!(validity_votes.contains(
&ValidityAttestation::Explicit(signed_b.signature().clone())
));
assert!(validity_votes.contains(
&ValidityAttestation::Implicit(signed_a.signature().clone())
));
assert_eq!(validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0, 1]);
}
);
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
......@@ -1376,10 +1427,10 @@ mod tests {
let signed_b = SignedFullStatement::sign(
&test_state.keystore,
Statement::Valid(candidate_a_hash),
Statement::Invalid(candidate_a_hash),
&test_state.signing_context,
0,
&public0.into(),
2,
&public2.into(),
).await.expect("should be signed");
let signed_c = SignedFullStatement::sign(
......@@ -1449,10 +1500,36 @@ mod tests {
}
);
// This `Invalid` statement contradicts the `Candidate` statement
// sent at first.
let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_b.clone());
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::Provisioner(
ProvisionerMessage::ProvisionableData(
ProvisionableData::MisbehaviorReport(
relay_parent,
MisbehaviorReport::SelfContradiction(_, s1, s2),
)
)
) if relay_parent == test_state.relay_parent => {
s1.check_signature(
&test_state.signing_context,
&test_state.validator_public[s1.validator_index() as usize],
).unwrap();
s2.check_signature(
&test_state.signing_context,
&test_state.validator_public[s2.validator_index() as usize],
).unwrap();
}
);
// This `Invalid` statement contradicts the `Valid` statement the subsystem
// should have issued behind the scenes.
let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_c.clone());
virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;
......
......@@ -368,8 +368,7 @@ async fn select_candidates(
let (scheduled_core, assumption) = match core {
CoreState::Scheduled(scheduled_core) => (scheduled_core, OccupiedCoreAssumption::Free),
CoreState::Occupied(occupied_core) => {
if bitfields_indicate_availability(core_idx, bitfields, &occupied_core.availability)
{
if bitfields_indicate_availability(core_idx, bitfields, &occupied_core.availability) {
if let Some(ref scheduled_core) = occupied_core.next_up_on_available {
(scheduled_core, OccupiedCoreAssumption::Included)
} else {
......@@ -511,4 +510,4 @@ impl metrics::Metrics for Metrics {
delegated_subsystem!(ProvisioningJob((), Metrics) <- ToJob as ProvisioningSubsystem);
#[cfg(test)]
mod tests;
\ No newline at end of file
mod tests;
......@@ -256,15 +256,14 @@ impl<C: Context> CandidateData<C> {
// if it has enough validity votes
// and no authorities have called it bad.
fn can_be_included(&self, validity_threshold: usize) -> bool {
self.indicated_bad_by.is_empty()
&& self.validity_votes.len() >= validity_threshold
self.validity_votes.len() >= validity_threshold
}
fn summary(&self, digest: C::Digest) -> Summary<C::Digest, C::GroupId> {
Summary {
candidate: digest,
group_id: self.group_id.clone(),
validity_votes: self.validity_votes.len() - self.indicated_bad_by.len(),
validity_votes: self.validity_votes.len(),
signalled_bad: self.indicated_bad(),
}
}
......@@ -362,6 +361,20 @@ impl<C: Context> Table<C> {
})
}
/// Get the attested candidate for `digest`.
///
/// Returns `Some(_)` if the candidate exists and is includable.
pub fn attested_candidate(&self, digest: &C::Digest, context: &C)
-> Option<AttestedCandidate<
C::GroupId, C::Candidate, C::AuthorityId, C::Signature,
>>
{
self.candidate_votes.get(digest).and_then(|data| {
let v_threshold = context.requisite_votes(&data.group_id);
data.attested(v_threshold)
})
}
/// Import a signed statement. Signatures should be checked for validity, and the
/// sender should be checked to actually be an authority.
///
......@@ -489,7 +502,7 @@ impl<C: Context> Table<C> {
if new_proposal {
self.candidate_votes.entry(digest.clone()).or_insert_with(move || CandidateData {
group_id: group,
candidate: candidate,
candidate,
validity_votes: HashMap::new(),
indicated_bad_by: Vec::new(),
});
......@@ -581,7 +594,7 @@ impl<C: Context> Table<C> {
}
Entry::Vacant(vacant) => {
if let ValidityVote::Invalid(_) = vote {
votes.indicated_bad_by.push(from);
votes.indicated_bad_by.push(from.clone());
}
vacant.insert(vote);
......@@ -595,7 +608,12 @@ impl<C: Context> Table<C> {
}
}
fn update_includable_count<G: Hash + Eq + Clone>(map: &mut HashMap<G, usize>, group_id: &G, was_includable: bool, is_includable: bool) {
fn update_includable_count<G: Hash + Eq + Clone>(
map: &mut HashMap<G, usize>,
group_id: &G,
was_includable: bool,
is_includable: bool,
) {
if was_includable && !is_includable {
if let Entry::Occupied(mut entry) = map.entry(group_id.clone()) {
*entry.get_mut() -= 1;
......@@ -989,7 +1007,7 @@ mod tests {
candidate.indicated_bad_by.push(AuthorityId(1024));
assert!(!candidate.can_be_included(validity_threshold));
assert!(candidate.can_be_included(validity_threshold));
}
#[test]
......@@ -1039,8 +1057,8 @@ mod tests {
table.import_statement(&context, vote);
assert!(!table.detected_misbehavior.contains_key(&AuthorityId(3)));
assert!(!table.candidate_includable(&candidate_digest, &context));
assert!(table.includable_count.is_empty());
assert!(table.candidate_includable(&candidate_digest, &context));
assert!(table.includable_count.get(&GroupId(2)).is_some());
}
#[test]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment