Unverified Commit e1d29ffb authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

approval-voting: processed wakeups can also update approval state (#3848)



* approval-voting: processed wakeups can also update approval state

* fmt changes

* reverse broken if condition

* further correct condition

* add test

* fmt

* Update node/core/approval-voting/src/lib.rs
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 4322a332
Pipeline #157377 failed with stages
in 37 minutes and 34 seconds
......@@ -694,6 +694,7 @@ where
woken_block,
woken_candidate,
tick,
&subsystem.metrics,
)?
}
next_msg = ctx.recv().fuse() => {
......@@ -1710,14 +1711,14 @@ fn check_and_import_approval<T>(
None
};
let mut actions = import_checked_approval(
let mut actions = advance_approval_state(
state,
db,
&metrics,
block_entry,
approved_candidate_hash,
candidate_entry,
ApprovalSource::Remote(approval.validator),
ApprovalStateTransition::RemoteApproval(approval.validator),
);
actions.extend(inform_disputes_action);
......@@ -1725,41 +1726,46 @@ fn check_and_import_approval<T>(
Ok((actions, t))
}
enum ApprovalSource {
Remote(ValidatorIndex),
Local(ValidatorIndex, ValidatorSignature),
#[derive(Debug)]
enum ApprovalStateTransition {
RemoteApproval(ValidatorIndex),
LocalApproval(ValidatorIndex, ValidatorSignature),
WakeupProcessed,
}
impl ApprovalSource {
fn validator_index(&self) -> ValidatorIndex {
impl ApprovalStateTransition {
fn validator_index(&self) -> Option<ValidatorIndex> {
match *self {
ApprovalSource::Remote(v) | ApprovalSource::Local(v, _) => v,
ApprovalStateTransition::RemoteApproval(v) |
ApprovalStateTransition::LocalApproval(v, _) => Some(v),
ApprovalStateTransition::WakeupProcessed => None,
}
}
fn is_remote(&self) -> bool {
fn is_local_approval(&self) -> bool {
match *self {
ApprovalSource::Remote(_) => true,
ApprovalSource::Local(_, _) => false,
ApprovalStateTransition::RemoteApproval(_) => false,
ApprovalStateTransition::LocalApproval(_, _) => true,
ApprovalStateTransition::WakeupProcessed => false,
}
}
}
// Import an approval vote which is already checked to be valid and corresponding to an assigned
// validator on the candidate and block. This updates the block entry and candidate entry as
// Advance the approval state, either by importing an approval vote which is already checked to be valid and corresponding to an assigned
// validator on the candidate and block, or by noting that there are no further wakeups or tranches needed. This updates the block entry and candidate entry as
// necessary and schedules any further wakeups.
fn import_checked_approval(
fn advance_approval_state(
state: &State,
db: &mut OverlayedBackend<'_, impl Backend>,
metrics: &Metrics,
mut block_entry: BlockEntry,
candidate_hash: CandidateHash,
mut candidate_entry: CandidateEntry,
source: ApprovalSource,
transition: ApprovalStateTransition,
) -> Vec<Action> {
let validator_index = source.validator_index();
let validator_index = transition.validator_index();
let already_approved_by = candidate_entry.mark_approval(validator_index);
let already_approved_by = validator_index.as_ref().map(|v| candidate_entry.mark_approval(*v));
let candidate_approved_in_block = block_entry.is_candidate_approved(&candidate_hash);
// Check for early exits.
......@@ -1771,17 +1777,13 @@ fn import_checked_approval(
// If the block was approved, but the validator hadn't approved it yet, we should still hold
// onto the approval vote on-disk in case we restart and rebroadcast votes. Otherwise, our
// assignment might manifest as a no-show.
match source {
ApprovalSource::Remote(_) => {
// We don't store remote votes, so we can early exit as long at the candidate is
// already concluded under the block i.e. we don't need more approvals.
if candidate_approved_in_block {
return Vec::new()
}
},
ApprovalSource::Local(_, _) => {
// We never early return on the local validator.
},
if !transition.is_local_approval() {
// We don't store remote votes and there's nothing to store for processed wakeups,
// so we can early exit as long at the candidate is already concluded under the
// block i.e. we don't need more approvals.
if candidate_approved_in_block {
return Vec::new()
}
}
let mut actions = Vec::new();
......@@ -1852,7 +1854,7 @@ fn import_checked_approval(
approval_entry.mark_approved();
}
if let ApprovalSource::Local(_, ref sig) = source {
if let ApprovalStateTransition::LocalApproval(_, ref sig) = transition {
approval_entry.import_approval_sig(sig.clone());
}
......@@ -1865,13 +1867,15 @@ fn import_checked_approval(
status.required_tranches,
));
// We have no need to write the candidate entry if
// We have no need to write the candidate entry if all of the following
// is true:
//
// 1. The source is remote, as we don't store anything new in the approval entry.
// 1. This is not a local approval, as we don't store anything new in the approval entry.
// 2. The candidate is not newly approved, as we haven't altered the approval entry's
// approved flag with `mark_approved` above.
// 3. The source had already approved the candidate, as we haven't altered the bitfield.
if !source.is_remote() || newly_approved || !already_approved_by {
// 3. The approver, if any, had already approved the candidate, as we haven't altered the bitfield.
if transition.is_local_approval() || newly_approved || !already_approved_by.unwrap_or(true)
{
// In all other cases, we need to write the candidate entry.
db.write_candidate_entry(candidate_entry);
}
......@@ -1918,6 +1922,7 @@ fn process_wakeup(
relay_block: Hash,
candidate_hash: CandidateHash,
expected_tick: Tick,
metrics: &Metrics,
) -> SubsystemResult<Vec<Action>> {
let _span = jaeger::Span::from_encodable(
(relay_block, candidate_hash, expected_tick),
......@@ -2040,28 +2045,20 @@ fn process_wakeup(
}
}
let approval_entry = candidate_entry
.approval_entry(&relay_block)
.expect("this function returned earlier if not available; qed");
// Although we ran this earlier in the function, we need to run again because we might have
// imported our own assignment, which could change things.
let tranches_to_approve = approval_checking::tranches_to_approve(
&approval_entry,
candidate_entry.approvals(),
tranche_now,
block_tick,
no_show_duration,
session_info.needed_approvals as _,
);
actions.extend(schedule_wakeup_action(
&approval_entry,
relay_block,
block_entry.block_number(),
// Although we checked approval earlier in this function,
// this wakeup might have advanced the state to approved via
// a no-show that was immediately covered and therefore
// we need to check for that and advance the state on-disk.
//
// Note that this function also schedules a wakeup as necessary.
actions.extend(advance_approval_state(
state,
db,
metrics,
block_entry,
candidate_hash,
block_tick,
tranches_to_approve,
candidate_entry,
ApprovalStateTransition::WakeupProcessed,
));
Ok(actions)
......@@ -2436,14 +2433,14 @@ async fn issue_approval(
None
};
let mut actions = import_checked_approval(
let mut actions = advance_approval_state(
state,
db,
metrics,
block_entry,
candidate_hash,
candidate_entry,
ApprovalSource::Local(validator_index as _, sig.clone()),
ApprovalStateTransition::LocalApproval(validator_index as _, sig.clone()),
);
metrics.on_approval_produced();
......
......@@ -2555,3 +2555,197 @@ fn subsystem_assignment_not_triggered_if_at_maximum_but_clock_is_before_with_dri
should_be_triggered: |_| false,
});
}
#[test]
fn pre_covers_dont_stall_approval() {
// A, B are tranche 0.
// C is tranche 1.
//
// All assignments imported at once, and B, C approvals imported immediately.
// A no-shows, leading to being covered by C.
// Technically, this is an approved block, but it will be approved
// when the no-show timer hits, not as a response to an approval vote.
//
// Note that we have 6 validators, otherwise the 2nd approval triggers
// the >1/3 insta-approval condition.
let assignment_criteria = Box::new(MockAssignmentCriteria::check_only(
move |validator_index| match validator_index {
ValidatorIndex(0 | 1) => Ok(0),
ValidatorIndex(2) => Ok(1),
ValidatorIndex(_) => Err(criteria::InvalidAssignment),
},
));
let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build();
let store = config.backend();
test_harness(config, |test_harness| async move {
let TestHarness {
mut virtual_overseer,
clock,
sync_oracle_handle: _sync_oracle_handle,
..
} = test_harness;
let block_hash = Hash::repeat_byte(0x01);
let validator_index_a = ValidatorIndex(0);
let validator_index_b = ValidatorIndex(1);
let validator_index_c = ValidatorIndex(2);
let validators = vec![
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Eve,
Sr25519Keyring::One,
];
let session_info = SessionInfo {
validators: validators.iter().map(|v| v.public().into()).collect(),
validator_groups: vec![
vec![ValidatorIndex(0), ValidatorIndex(1)],
vec![ValidatorIndex(2), ValidatorIndex(5)],
vec![ValidatorIndex(3), ValidatorIndex(4)],
],
needed_approvals: 2,
discovery_keys: validators.iter().map(|v| v.public().into()).collect(),
assignment_keys: validators.iter().map(|v| v.public().into()).collect(),
n_cores: validators.len() as _,
zeroth_delay_tranche_width: 5,
relay_vrf_modulo_samples: 3,
n_delay_tranches: 50,
no_show_slots: 2,
};
let candidate_descriptor = make_candidate(1.into(), &block_hash);
let candidate_hash = candidate_descriptor.hash();
let head: Hash = ChainBuilder::GENESIS_HASH;
let mut builder = ChainBuilder::new();
let slot = Slot::from(1 as u64);
builder.add_block(
block_hash,
head,
1,
BlockConfig {
slot,
candidates: Some(vec![(candidate_descriptor, CoreIndex(0), GroupIndex(0))]),
session_info: Some(session_info),
},
);
builder.build(&mut virtual_overseer).await;
let candidate_index = 0;
let rx = check_and_import_assignment(
&mut virtual_overseer,
block_hash,
candidate_index,
validator_index_a,
)
.await;
assert_eq!(rx.await, Ok(AssignmentCheckResult::Accepted),);
let rx = check_and_import_assignment(
&mut virtual_overseer,
block_hash,
candidate_index,
validator_index_b,
)
.await;
assert_eq!(rx.await, Ok(AssignmentCheckResult::Accepted),);
let rx = check_and_import_assignment(
&mut virtual_overseer,
block_hash,
candidate_index,
validator_index_c,
)
.await;
assert_eq!(rx.await, Ok(AssignmentCheckResult::Accepted),);
let session_index = 1;
let sig_b = sign_approval(Sr25519Keyring::Bob, candidate_hash, session_index);
let rx = check_and_import_approval(
&mut virtual_overseer,
block_hash,
candidate_index,
validator_index_b,
candidate_hash,
session_index,
false,
true,
Some(sig_b),
)
.await;
assert_eq!(rx.await, Ok(ApprovalCheckResult::Accepted),);
let sig_c = sign_approval(Sr25519Keyring::Charlie, candidate_hash, session_index);
let rx = check_and_import_approval(
&mut virtual_overseer,
block_hash,
candidate_index,
validator_index_c,
candidate_hash,
session_index,
false,
true,
Some(sig_c),
)
.await;
assert_eq!(rx.await, Ok(ApprovalCheckResult::Accepted),);
// Sleep to ensure we get a consistent read on the database.
//
// NOTE: Since the response above occurs before writing to the database, we are somewhat
// breaking the external consistency of the API by reaching into the database directly.
// Under normal operation, this wouldn't be necessary, since all requests are serialized by
// the event loop and we write at the end of each pass. However, if the database write were
// to fail, a downstream subsystem may expect for this candidate to be approved, and
// possibly take further actions on the assumption that the candidate is approved, when
// that may not be the reality from the database's perspective. This could be avoided
// entirely by having replies processed after database writes, but that would constitute a
// larger refactor and incur a performance penalty.
futures_timer::Delay::new(Duration::from_millis(100)).await;
// The candidate should not be approved.
let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap();
assert!(!candidate_entry.approval_entry(&block_hash).unwrap().is_approved());
assert!(clock.inner.lock().has_wakeup(20));
// Wait for the no-show timer to observe the approval from
// tranche 0 and set a wakeup for tranche 1.
clock.inner.lock().set_tick(20);
// Sleep to ensure we get a consistent read on the database.
futures_timer::Delay::new(Duration::from_millis(100)).await;
// The next wakeup should observe the assignment & approval from
// tranche 1, and the no-show from tranche 0 should be immediately covered.
assert_eq!(clock.inner.lock().next_wakeup(), Some(31));
clock.inner.lock().set_tick(31);
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainSelection(ChainSelectionMessage::Approved(b_hash)) => {
assert_eq!(b_hash, block_hash);
}
);
// The candidate and block should now be approved.
let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap();
assert!(candidate_entry.approval_entry(&block_hash).unwrap().is_approved());
assert!(clock.inner.lock().next_wakeup().is_none());
let block_entry = store.load_block_entry(&block_hash).unwrap().unwrap();
assert!(block_entry.is_fully_approved());
virtual_overseer
});
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment