Unverified Commit 1e2b8ae5 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Approval Checking Improvements Omnibus (#2480)

* add tracing to approval voting

* notify if session info is not working

* add dispute period to chain specs

* propagate genesis session to parachains runtime

* use `on_genesis_session`

* protect against zero cores in computation

* tweak voting rule to be based off of best and add logs

* genesis configuration should use VRF slots only

* swallow more keystore errors

* add some docs

* make validation-worker args non-optional and update clap

* better tracing for bitfield signing and provisioner

* pass amount of bits in bitfields to inclusion instead of recomputing

* debug -> warn for some logs

* better tracing for availability recovery

* a little av-store tracing

* bridge: forward availability recovery messages

* add missing try_from impl

* some more tracing

* improve approval distribution tracing

* guide: hold onto pending approval messages until NewBlocks

* Hold onto pending approval messages until NewBlocks

* guide: adjust comment

* process all actions for one wakeup at a time

* vec

* fix network bridge test

* replace randomness-collective-flip with Babe

* remove PairNotFound
parent 175d7079
Pipeline #125424 failed with stages
in 14 minutes and 8 seconds
......@@ -787,9 +787,9 @@ dependencies = [
[[package]]
name = "clap"
version = "2.33.1"
version = "2.33.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129"
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
dependencies = [
"ansi_term 0.11.0",
"atty",
......
......@@ -60,8 +60,8 @@ pub enum Subcommand {
#[allow(missing_docs)]
#[derive(Debug, StructOpt)]
pub struct ValidationWorkerCommand {
/// The path that the executor can use for it's caching purposes.
pub cache_base_path: Option<std::path::PathBuf>,
/// The path that the executor can use for its caching purposes.
pub cache_base_path: std::path::PathBuf,
#[allow(missing_docs)]
pub mem_id: String,
......
......@@ -259,7 +259,7 @@ pub fn run() -> Result<()> {
#[cfg(not(any(target_os = "android", feature = "browser")))]
polkadot_parachain::wasm_executor::run_worker(
&cmd.mem_id,
cmd.cache_base_path.clone(),
Some(cmd.cache_base_path.clone()),
)?;
Ok(())
}
......
......@@ -246,11 +246,17 @@ pub(crate) fn compute_assignments(
config: &Config,
leaving_cores: impl IntoIterator<Item = (CoreIndex, GroupIndex)> + Clone,
) -> HashMap<CoreIndex, OurAssignment> {
if config.n_cores == 0 || config.assignment_keys.is_empty() || config.validator_groups.is_empty() {
return HashMap::new()
}
let (index, assignments_key): (ValidatorIndex, AssignmentPair) = {
let key = config.assignment_keys.iter().enumerate()
.find_map(|(i, p)| match keystore.key_pair(p) {
Ok(Some(pair)) => Some((i as ValidatorIndex, pair)),
Ok(None) => None,
Err(sc_keystore::Error::Unavailable) => None,
Err(sc_keystore::Error::Io(e)) if e.kind() == std::io::ErrorKind::NotFound => None,
Err(e) => {
tracing::warn!(target: LOG_TARGET, "Encountered keystore error: {:?}", e);
None
......@@ -608,6 +614,34 @@ mod tests {
assert!(assignments.get(&CoreIndex(1)).is_some());
}
#[test]
fn succeeds_empty_for_0_cores() {
let keystore = futures::executor::block_on(
make_keystore(&[Sr25519Keyring::Alice])
);
let relay_vrf_story = RelayVRFStory([42u8; 32]);
let assignments = compute_assignments(
&keystore,
relay_vrf_story,
&Config {
assignment_keys: assignment_keys(&[
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
]),
validator_groups: vec![],
n_cores: 0,
zeroth_delay_tranche_width: 10,
relay_vrf_modulo_samples: 3,
n_delay_tranches: 40,
},
vec![],
);
assert!(assignments.is_empty());
}
struct MutatedAssignment {
core: CoreIndex,
cert: AssignmentCert,
......
......@@ -206,7 +206,16 @@ async fn load_all_sessions(
let session_info = match rx.await {
Ok(Ok(Some(s))) => s,
Ok(Ok(None)) => return Ok(None),
Ok(Ok(None)) => {
tracing::warn!(
target: LOG_TARGET,
"Session {} is missing from session-info state of block {}",
i,
block_hash,
);
return Ok(None);
}
Ok(Err(e)) => return Err(SubsystemError::with_origin("approval-voting", e)),
Err(e) => return Err(SubsystemError::with_origin("approval-voting", e)),
};
......
......@@ -54,7 +54,6 @@ use futures::channel::{mpsc, oneshot};
use std::collections::{BTreeMap, HashMap};
use std::collections::btree_map::Entry;
use std::sync::Arc;
use std::ops::{RangeBounds, Bound as RangeBound};
use approval_checking::RequiredTranches;
use persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry};
......@@ -88,6 +87,9 @@ pub struct Config {
/// The approval voting subsystem.
pub struct ApprovalVotingSubsystem {
/// LocalKeystore is needed for assignment keys, but not necessarily approval keys.
///
/// We do a lot of VRF signing and need the keys to have low latency.
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
db: Arc<dyn KeyValueDB>,
......@@ -190,25 +192,29 @@ impl Wakeups {
self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
}
// drains all wakeups within the given range.
// panics if the given range is empty.
//
// only looks at the end bound of the range.
fn drain<'a, R: RangeBounds<Tick>>(&'a mut self, range: R)
-> impl Iterator<Item = (Hash, CandidateHash)> + 'a
{
let reverse = &mut self.reverse_wakeups;
// Returns the next wakeup. this future never returns if there are no wakeups.
async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Hash, CandidateHash) {
match self.first() {
None => future::pending().await,
Some(tick) => {
clock.wait(tick).await;
match self.wakeups.entry(tick) {
Entry::Vacant(_) => panic!("entry is known to exist since `first` was `Some`; qed"),
Entry::Occupied(mut entry) => {
let (hash, candidate_hash) = entry.get_mut().pop()
.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
if entry.get().is_empty() {
let _ = entry.remove();
}
// BTreeMap has no `drain` method :(
let after = match range.end_bound() {
RangeBound::Unbounded => BTreeMap::new(),
RangeBound::Included(last) => self.wakeups.split_off(&(last + 1)),
RangeBound::Excluded(last) => self.wakeups.split_off(&last),
};
let prev = std::mem::replace(&mut self.wakeups, after);
prev.into_iter()
.flat_map(|(_, wakeup)| wakeup)
.inspect(move |&(ref b, ref c)| { let _ = reverse.remove(&(*b, *c)); })
self.reverse_wakeups.remove(&(hash, candidate_hash));
(hash, candidate_hash)
}
}
}
}
}
}
......@@ -323,28 +329,13 @@ async fn run<C>(
let db_writer = &*subsystem.db;
loop {
let wait_til_next_tick = match wakeups.first() {
None => future::Either::Left(future::pending()),
Some(tick) => future::Either::Right(
state.clock.wait(tick).map(move |()| tick)
),
};
futures::pin_mut!(wait_til_next_tick);
let actions = futures::select! {
tick_wakeup = wait_til_next_tick.fuse() => {
let woken = wakeups.drain(..=tick_wakeup).collect::<Vec<_>>();
let mut actions = Vec::new();
for (woken_block, woken_candidate) in woken {
actions.extend(process_wakeup(
&mut state,
woken_block,
woken_candidate,
)?);
}
actions
(woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
process_wakeup(
&mut state,
woken_block,
woken_candidate,
)?
}
next_msg = ctx.recv().fuse() => {
handle_from_overseer(
......@@ -467,19 +458,36 @@ async fn handle_from_overseer(
Ok(block_imported_candidates) => {
// Schedule wakeups for all imported candidates.
for block_batch in block_imported_candidates {
tracing::debug!(
target: LOG_TARGET,
"Imported new block {} with {} included candidates",
block_batch.block_hash,
block_batch.imported_candidates.len(),
);
for (c_hash, c_entry) in block_batch.imported_candidates {
let our_tranche = c_entry
.approval_entry(&block_batch.block_hash)
.and_then(|a| a.our_assignment().map(|a| a.tranche()));
if let Some(our_tranche) = our_tranche {
let tick = our_tranche as Tick + block_batch.block_tick;
tracing::trace!(
target: LOG_TARGET,
"Scheduling first wakeup at tranche {} for candidate {} in block ({}, tick={})",
our_tranche,
c_hash,
block_batch.block_hash,
block_batch.block_tick,
);
// Our first wakeup will just be the tranche of our assignment,
// if any. This will likely be superseded by incoming assignments
// and approvals which trigger rescheduling.
actions.push(Action::ScheduleWakeup {
block_hash: block_batch.block_hash,
candidate_hash: c_hash,
tick: our_tranche as Tick + block_batch.block_tick,
tick,
});
}
}
......@@ -564,6 +572,10 @@ async fn handle_approved_ancestor(
target: Hash,
lower_bound: BlockNumber,
) -> SubsystemResult<Option<(Hash, BlockNumber)>> {
const MAX_TRACING_WINDOW: usize = 200;
use bitvec::{order::Lsb0, vec::BitVec};
let mut all_approved_max = None;
let target_number = {
......@@ -600,15 +612,29 @@ async fn handle_approved_ancestor(
Vec::new()
};
let mut bits: BitVec<Lsb0, u8> = Default::default();
for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
// Block entries should be present as the assumption is that
// nothing here is finalized. If we encounter any missing block
// entries we can fail.
let entry = match db.load_block_entry(&block_hash)? {
None => return Ok(None),
None => {
tracing::trace!{
target: LOG_TARGET,
"Chain between ({}, {}) and {} not fully known. Forcing vote on {}",
target,
target_number,
lower_bound,
lower_bound,
}
return Ok(None);
}
Some(b) => b,
};
// even if traversing millions of blocks this is fairly cheap and always dwarfed by the
// disk lookups.
bits.push(entry.is_fully_approved());
if entry.is_fully_approved() {
if all_approved_max.is_none() {
// First iteration of the loop is target, i = 0. After that,
......@@ -620,6 +646,32 @@ async fn handle_approved_ancestor(
}
}
tracing::trace!(
target: LOG_TARGET,
"approved blocks {}-[{}]-{}",
target_number,
{
// formatting to divide bits by groups of 10.
// when comparing logs on multiple machines where the exact vote
// targets may differ, this grouping is useful.
let mut s = String::with_capacity(bits.len());
for (i, bit) in bits.iter().enumerate().take(MAX_TRACING_WINDOW) {
s.push(if *bit { '1' } else { '0' });
if (target_number - i as u32) % 10 == 0 && i != bits.len() - 1 { s.push(' '); }
}
s
},
if bits.len() > MAX_TRACING_WINDOW {
format!(
"{}... (truncated due to large window)",
target_number - MAX_TRACING_WINDOW as u32 + 1,
)
} else {
format!("{}", lower_bound + 1)
},
);
Ok(all_approved_max)
}
......@@ -649,11 +701,8 @@ fn schedule_wakeup_action(
block_tick: Tick,
required_tranches: RequiredTranches,
) -> Option<Action> {
if approval_entry.is_approved() {
return None
}
match required_tranches {
let maybe_action = match required_tranches {
_ if approval_entry.is_approved() => None,
RequiredTranches::All => None,
RequiredTranches::Exact { next_no_show, .. } => next_no_show.map(|tick| Action::ScheduleWakeup {
block_hash,
......@@ -686,7 +735,28 @@ fn schedule_wakeup_action(
min_prefer_some(next_non_empty_tranche, next_no_show)
.map(|tick| Action::ScheduleWakeup { block_hash, candidate_hash, tick })
}
};
match maybe_action {
Some(Action::ScheduleWakeup { ref tick, .. }) => tracing::debug!(
target: LOG_TARGET,
"Scheduling next wakeup at {} for candidate {} under block ({}, tick={})",
tick,
candidate_hash,
block_hash,
block_tick,
),
None => tracing::debug!(
target: LOG_TARGET,
"No wakeup needed for candidate {} under block ({}, tick={})",
candidate_hash,
block_hash,
block_tick,
),
Some(_) => {} // unreachable
}
maybe_action
}
fn check_and_import_assignment(
......@@ -773,6 +843,13 @@ fn check_and_import_assignment(
if is_duplicate {
AssignmentCheckResult::AcceptedDuplicate
} else {
tracing::trace!(
target: LOG_TARGET,
"Imported assignment from validator {} on candidate {:?}",
assignment.validator,
(assigned_candidate_hash, candidate_entry.candidate_receipt().descriptor.para_id),
);
AssignmentCheckResult::Accepted
}
};
......@@ -867,6 +944,13 @@ fn check_and_import_approval<T>(
// importing the approval can be heavy as it may trigger acceptance for a series of blocks.
let t = with_response(ApprovalCheckResult::Accepted);
tracing::trace!(
target: LOG_TARGET,
"Importing approval vote from validator {:?} on candidate {:?}",
(approval.validator, &pubkey),
(approved_candidate_hash, candidate_entry.candidate_receipt().descriptor.para_id),
);
let actions = import_checked_approval(
state,
Some((approval.block_hash, block_entry)),
......@@ -976,6 +1060,13 @@ fn check_and_apply_full_approval(
);
if now_approved {
tracing::trace!(
target: LOG_TARGET,
"Candidate approved {} under block {}",
candidate_hash,
block_hash,
);
newly_approved.push(*block_hash);
block_entry.mark_approved_by_hash(&candidate_hash);
......@@ -1072,6 +1163,14 @@ fn process_wakeup(
let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
tracing::debug!(
target: LOG_TARGET,
"Processing wakeup at tranche {} for candidate {} under block {}",
tranche_now,
candidate_hash,
relay_block,
);
let (should_trigger, backing_group) = {
let approval_entry = match candidate_entry.approval_entry(&relay_block) {
Some(e) => e,
......@@ -1123,6 +1222,13 @@ fn process_wakeup(
.position(|(_, h)| &candidate_hash == h);
if let Some(i) = index_in_candidate {
tracing::debug!(
target: LOG_TARGET,
"Launching approval work for candidate {:?} in block {}",
(&candidate_hash, candidate_entry.candidate_receipt().descriptor.para_id),
relay_block,
);
// sanity: should always be present.
actions.push(Action::LaunchApproval {
indirect_cert,
......@@ -1173,6 +1279,14 @@ async fn launch_approval(
let (code_tx, code_rx) = oneshot::channel();
let (context_num_tx, context_num_rx) = oneshot::channel();
let candidate_hash = candidate.hash();
tracing::debug!(
target: LOG_TARGET,
"Recovering data for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
candidate.clone(),
session_index,
......@@ -1208,10 +1322,21 @@ async fn launch_approval(
Err(_) => return,
Ok(Ok(a)) => a,
Ok(Err(RecoveryError::Unavailable)) => {
tracing::warn!(
target: LOG_TARGET,
"Data unavailable for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
// do nothing. we'll just be a no-show and that'll cause others to rise up.
return;
}
Ok(Err(RecoveryError::Invalid)) => {
tracing::warn!(
target: LOG_TARGET,
"Data recovery invalid for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
// TODO: dispute. Either the merkle trie is bad or the erasure root is.
// https://github.com/paritytech/polkadot/issues/2176
return;
......@@ -1238,6 +1363,7 @@ async fn launch_approval(
let (val_tx, val_rx) = oneshot::channel();
let para_id = candidate.descriptor.para_id;
let _ = background_tx.send(BackgroundRequest::CandidateValidation(
available_data.validation_data,
validation_code,
......@@ -1252,6 +1378,12 @@ async fn launch_approval(
// Validation checked out. Issue an approval command. If the underlying service is unreachable,
// then there isn't anything we can do.
tracing::debug!(
target: LOG_TARGET,
"Candidate Valid {:?}",
(candidate_hash, para_id),
);
let _ = background_tx.send(BackgroundRequest::ApprovalVote(ApprovalVoteRequest {
validator_index,
block_hash,
......@@ -1259,6 +1391,12 @@ async fn launch_approval(
})).await;
}
Ok(Ok(ValidationResult::Invalid(_))) => {
tracing::warn!(
target: LOG_TARGET,
"Detected invalid candidate as an approval checker {:?}",
(candidate_hash, para_id),
);
// TODO: issue dispute, but not for timeouts.
// https://github.com/paritytech/polkadot/issues/2176
}
......@@ -1358,6 +1496,12 @@ async fn issue_approval(
}
};
tracing::debug!(
target: LOG_TARGET,
"Issuing approval vote for candidate {:?}",
candidate_hash,
);
let actions = import_checked_approval(
state,
Some((block_hash, block_entry)),
......
......@@ -37,7 +37,7 @@ fn slot_to_tick(t: impl Into<Slot>) -> crate::time::Tick {
crate::time::slot_number_to_tick(SLOT_DURATION_MILLIS, t.into())
}
#[derive(Default)]
#[derive(Default, Clone)]
struct MockClock {
inner: Arc<Mutex<MockClockInner>>,
}
......@@ -1209,7 +1209,7 @@ fn assignment_not_triggered_if_at_maximum_but_clock_is_before_with_drift() {
}
#[test]
fn wakeups_drain() {
fn wakeups_next() {
let mut wakeups = Wakeups::default();
let b_a = Hash::repeat_byte(0);
......@@ -1224,12 +1224,24 @@ fn wakeups_drain() {
assert_eq!(wakeups.first().unwrap(), 1);
assert_eq!(
wakeups.drain(..=3).collect::<Vec<_>>(),
vec![(b_a, c_a), (b_b, c_b)],
);
let clock = MockClock::new(0);
let clock_aux = clock.clone();
let test_fut = Box::pin(async move {
assert_eq!(wakeups.next(&clock).await, (b_a, c_a));
assert_eq!(wakeups.next(&clock).await, (b_b, c_b));
assert_eq!(wakeups.next(&clock).await, (b_a, c_b));
assert!(wakeups.first().is_none());
assert!(wakeups.wakeups.is_empty());
});
let aux_fut = Box::pin(async move {
clock_aux.inner.lock().set_tick(1);
// skip direct set to 3.
clock_aux.inner.lock().set_tick(4);
});
assert_eq!(wakeups.first().unwrap(), 4);
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
......@@ -1243,14 +1255,20 @@ fn wakeup_earlier_supersedes_later() {
wakeups.schedule(b_a, c_a, 2);
wakeups.schedule(b_a, c_a, 3);
assert_eq!(wakeups.first().unwrap(), 2);
let clock = MockClock::new(0);
let clock_aux = clock.clone();
assert_eq!(
wakeups.drain(..=2).collect::<Vec<_>>(),
vec![(b_a, c_a)],
);
let test_fut = Box::pin(async move {
assert_eq!(wakeups.next(&clock).await, (b_a, c_a));
assert!(wakeups.first().is_none());
assert!(wakeups.reverse_wakeups.is_empty());
});
let aux_fut = Box::pin(async move {
clock_aux.inner.lock().set_tick(2);
});
assert!(wakeups.first().is_none());
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
......@@ -1517,7 +1535,7 @@ fn approved_ancestor_all_approved() {
);
});
futures::executor::block_on(futures::future::select(test_fut, aux_fut));
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
......@@ -1599,7 +1617,7 @@ fn approved_ancestor_missing_approval() {
);
});
futures::executor::block_on(futures::future::select(test_fut, aux_fut));
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
......
......@@ -1104,6 +1104,13 @@ fn store_available_data(
write_available_data(&mut tx, &candidate_hash, &available_data);
subsystem.db.write(tx)?;