diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 7d20095264d053a6f86dfc21d802b99bc14fa4b8..01e0c84c64b04d00dbea9d47fd1f61b504268ef3 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -787,9 +787,9 @@ dependencies = [ [[package]] name = "clap" -version = "2.33.1" +version = "2.33.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" dependencies = [ "ansi_term 0.11.0", "atty", diff --git a/polkadot/cli/src/cli.rs b/polkadot/cli/src/cli.rs index f4f325568aaaacdf081fa293ba63e0c414a52eaa..c7520eee12120a6cabe0414030edd5e03e947184 100644 --- a/polkadot/cli/src/cli.rs +++ b/polkadot/cli/src/cli.rs @@ -60,8 +60,8 @@ pub enum Subcommand { #[allow(missing_docs)] #[derive(Debug, StructOpt)] pub struct ValidationWorkerCommand { - /// The path that the executor can use for it's caching purposes. - pub cache_base_path: Option<std::path::PathBuf>, + /// The path that the executor can use for its caching purposes. + pub cache_base_path: std::path::PathBuf, #[allow(missing_docs)] pub mem_id: String, diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index eb7c21108a60151de1f807348acdcf409f9c7873..8b7e135fffae3beb2d65ad9fb5c5356e3e202c0f 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -259,7 +259,7 @@ pub fn run() -> Result<()> { #[cfg(not(any(target_os = "android", feature = "browser")))] polkadot_parachain::wasm_executor::run_worker( &cmd.mem_id, - cmd.cache_base_path.clone(), + Some(cmd.cache_base_path.clone()), )?; Ok(()) } diff --git a/polkadot/node/core/approval-voting/src/criteria.rs b/polkadot/node/core/approval-voting/src/criteria.rs index 62300e7620aa87312c571bf3c0dd02eeb073dfb3..808d9659a233955e028532c3a965d7ecdcbba4bf 100644 --- a/polkadot/node/core/approval-voting/src/criteria.rs +++ b/polkadot/node/core/approval-voting/src/criteria.rs @@ -246,11 +246,17 @@ pub(crate) fn compute_assignments( config: &Config, leaving_cores: impl IntoIterator<Item = (CoreIndex, GroupIndex)> + Clone, ) -> HashMap<CoreIndex, OurAssignment> { + if config.n_cores == 0 || config.assignment_keys.is_empty() || config.validator_groups.is_empty() { + return HashMap::new() + } + let (index, assignments_key): (ValidatorIndex, AssignmentPair) = { let key = config.assignment_keys.iter().enumerate() .find_map(|(i, p)| match keystore.key_pair(p) { Ok(Some(pair)) => Some((i as ValidatorIndex, pair)), Ok(None) => None, + Err(sc_keystore::Error::Unavailable) => None, + Err(sc_keystore::Error::Io(e)) if e.kind() == std::io::ErrorKind::NotFound => None, Err(e) => { tracing::warn!(target: LOG_TARGET, "Encountered keystore error: {:?}", e); None @@ -608,6 +614,34 @@ mod tests { assert!(assignments.get(&CoreIndex(1)).is_some()); } + #[test] + fn succeeds_empty_for_0_cores() { + let keystore = futures::executor::block_on( + make_keystore(&[Sr25519Keyring::Alice]) + ); + + let relay_vrf_story = RelayVRFStory([42u8; 32]); + let assignments = compute_assignments( + &keystore, + relay_vrf_story, + &Config { + assignment_keys: assignment_keys(&[ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + ]), + validator_groups: vec![], + n_cores: 0, + zeroth_delay_tranche_width: 10, + relay_vrf_modulo_samples: 3, + n_delay_tranches: 40, + }, + vec![], + ); + + assert!(assignments.is_empty()); + } + struct MutatedAssignment { core: CoreIndex, cert: AssignmentCert, diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index 224d9ca310ef1f231cf9fb6d23d2465569b2c76f..8fc4b90b38451082bcaacd6e6d4aa283ba91c618 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -206,7 +206,16 @@ async fn load_all_sessions( let session_info = match rx.await { Ok(Ok(Some(s))) => s, - Ok(Ok(None)) => return Ok(None), + Ok(Ok(None)) => { + tracing::warn!( + target: LOG_TARGET, + "Session {} is missing from session-info state of block {}", + i, + block_hash, + ); + + return Ok(None); + } Ok(Err(e)) => return Err(SubsystemError::with_origin("approval-voting", e)), Err(e) => return Err(SubsystemError::with_origin("approval-voting", e)), }; diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index e85a679846abcc4f5c65a3ced7d5406a35bd27d5..963f91563e1b7b0d4a362db5e3811d931ac6cefb 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -54,7 +54,6 @@ use futures::channel::{mpsc, oneshot}; use std::collections::{BTreeMap, HashMap}; use std::collections::btree_map::Entry; use std::sync::Arc; -use std::ops::{RangeBounds, Bound as RangeBound}; use approval_checking::RequiredTranches; use persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry}; @@ -88,6 +87,9 @@ pub struct Config { /// The approval voting subsystem. pub struct ApprovalVotingSubsystem { + /// LocalKeystore is needed for assignment keys, but not necessarily approval keys. + /// + /// We do a lot of VRF signing and need the keys to have low latency. keystore: Arc<LocalKeystore>, slot_duration_millis: u64, db: Arc<dyn KeyValueDB>, @@ -190,25 +192,29 @@ impl Wakeups { self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash)); } - // drains all wakeups within the given range. - // panics if the given range is empty. - // - // only looks at the end bound of the range. - fn drain<'a, R: RangeBounds<Tick>>(&'a mut self, range: R) - -> impl Iterator<Item = (Hash, CandidateHash)> + 'a - { - let reverse = &mut self.reverse_wakeups; + // Returns the next wakeup. this future never returns if there are no wakeups. + async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Hash, CandidateHash) { + match self.first() { + None => future::pending().await, + Some(tick) => { + clock.wait(tick).await; + match self.wakeups.entry(tick) { + Entry::Vacant(_) => panic!("entry is known to exist since `first` was `Some`; qed"), + Entry::Occupied(mut entry) => { + let (hash, candidate_hash) = entry.get_mut().pop() + .expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed"); + + if entry.get().is_empty() { + let _ = entry.remove(); + } - // BTreeMap has no `drain` method :( - let after = match range.end_bound() { - RangeBound::Unbounded => BTreeMap::new(), - RangeBound::Included(last) => self.wakeups.split_off(&(last + 1)), - RangeBound::Excluded(last) => self.wakeups.split_off(&last), - }; - let prev = std::mem::replace(&mut self.wakeups, after); - prev.into_iter() - .flat_map(|(_, wakeup)| wakeup) - .inspect(move |&(ref b, ref c)| { let _ = reverse.remove(&(*b, *c)); }) + self.reverse_wakeups.remove(&(hash, candidate_hash)); + + (hash, candidate_hash) + } + } + } + } } } @@ -323,28 +329,13 @@ async fn run<C>( let db_writer = &*subsystem.db; loop { - let wait_til_next_tick = match wakeups.first() { - None => future::Either::Left(future::pending()), - Some(tick) => future::Either::Right( - state.clock.wait(tick).map(move |()| tick) - ), - }; - futures::pin_mut!(wait_til_next_tick); - let actions = futures::select! { - tick_wakeup = wait_til_next_tick.fuse() => { - let woken = wakeups.drain(..=tick_wakeup).collect::<Vec<_>>(); - - let mut actions = Vec::new(); - for (woken_block, woken_candidate) in woken { - actions.extend(process_wakeup( - &mut state, - woken_block, - woken_candidate, - )?); - } - - actions + (woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => { + process_wakeup( + &mut state, + woken_block, + woken_candidate, + )? } next_msg = ctx.recv().fuse() => { handle_from_overseer( @@ -467,19 +458,36 @@ async fn handle_from_overseer( Ok(block_imported_candidates) => { // Schedule wakeups for all imported candidates. for block_batch in block_imported_candidates { + tracing::debug!( + target: LOG_TARGET, + "Imported new block {} with {} included candidates", + block_batch.block_hash, + block_batch.imported_candidates.len(), + ); + for (c_hash, c_entry) in block_batch.imported_candidates { let our_tranche = c_entry .approval_entry(&block_batch.block_hash) .and_then(|a| a.our_assignment().map(|a| a.tranche())); if let Some(our_tranche) = our_tranche { + let tick = our_tranche as Tick + block_batch.block_tick; + tracing::trace!( + target: LOG_TARGET, + "Scheduling first wakeup at tranche {} for candidate {} in block ({}, tick={})", + our_tranche, + c_hash, + block_batch.block_hash, + block_batch.block_tick, + ); + // Our first wakeup will just be the tranche of our assignment, // if any. This will likely be superseded by incoming assignments // and approvals which trigger rescheduling. actions.push(Action::ScheduleWakeup { block_hash: block_batch.block_hash, candidate_hash: c_hash, - tick: our_tranche as Tick + block_batch.block_tick, + tick, }); } } @@ -564,6 +572,10 @@ async fn handle_approved_ancestor( target: Hash, lower_bound: BlockNumber, ) -> SubsystemResult<Option<(Hash, BlockNumber)>> { + const MAX_TRACING_WINDOW: usize = 200; + + use bitvec::{order::Lsb0, vec::BitVec}; + let mut all_approved_max = None; let target_number = { @@ -600,15 +612,29 @@ async fn handle_approved_ancestor( Vec::new() }; + let mut bits: BitVec<Lsb0, u8> = Default::default(); for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() { // Block entries should be present as the assumption is that // nothing here is finalized. If we encounter any missing block // entries we can fail. let entry = match db.load_block_entry(&block_hash)? { - None => return Ok(None), + None => { + tracing::trace!{ + target: LOG_TARGET, + "Chain between ({}, {}) and {} not fully known. Forcing vote on {}", + target, + target_number, + lower_bound, + lower_bound, + } + return Ok(None); + } Some(b) => b, }; + // even if traversing millions of blocks this is fairly cheap and always dwarfed by the + // disk lookups. + bits.push(entry.is_fully_approved()); if entry.is_fully_approved() { if all_approved_max.is_none() { // First iteration of the loop is target, i = 0. After that, @@ -620,6 +646,32 @@ async fn handle_approved_ancestor( } } + tracing::trace!( + target: LOG_TARGET, + "approved blocks {}-[{}]-{}", + target_number, + { + // formatting to divide bits by groups of 10. + // when comparing logs on multiple machines where the exact vote + // targets may differ, this grouping is useful. + let mut s = String::with_capacity(bits.len()); + for (i, bit) in bits.iter().enumerate().take(MAX_TRACING_WINDOW) { + s.push(if *bit { '1' } else { '0' }); + if (target_number - i as u32) % 10 == 0 && i != bits.len() - 1 { s.push(' '); } + } + + s + }, + if bits.len() > MAX_TRACING_WINDOW { + format!( + "{}... (truncated due to large window)", + target_number - MAX_TRACING_WINDOW as u32 + 1, + ) + } else { + format!("{}", lower_bound + 1) + }, + ); + Ok(all_approved_max) } @@ -649,11 +701,8 @@ fn schedule_wakeup_action( block_tick: Tick, required_tranches: RequiredTranches, ) -> Option<Action> { - if approval_entry.is_approved() { - return None - } - - match required_tranches { + let maybe_action = match required_tranches { + _ if approval_entry.is_approved() => None, RequiredTranches::All => None, RequiredTranches::Exact { next_no_show, .. } => next_no_show.map(|tick| Action::ScheduleWakeup { block_hash, @@ -686,7 +735,28 @@ fn schedule_wakeup_action( min_prefer_some(next_non_empty_tranche, next_no_show) .map(|tick| Action::ScheduleWakeup { block_hash, candidate_hash, tick }) } + }; + + match maybe_action { + Some(Action::ScheduleWakeup { ref tick, .. }) => tracing::debug!( + target: LOG_TARGET, + "Scheduling next wakeup at {} for candidate {} under block ({}, tick={})", + tick, + candidate_hash, + block_hash, + block_tick, + ), + None => tracing::debug!( + target: LOG_TARGET, + "No wakeup needed for candidate {} under block ({}, tick={})", + candidate_hash, + block_hash, + block_tick, + ), + Some(_) => {} // unreachable } + + maybe_action } fn check_and_import_assignment( @@ -773,6 +843,13 @@ fn check_and_import_assignment( if is_duplicate { AssignmentCheckResult::AcceptedDuplicate } else { + tracing::trace!( + target: LOG_TARGET, + "Imported assignment from validator {} on candidate {:?}", + assignment.validator, + (assigned_candidate_hash, candidate_entry.candidate_receipt().descriptor.para_id), + ); + AssignmentCheckResult::Accepted } }; @@ -867,6 +944,13 @@ fn check_and_import_approval<T>( // importing the approval can be heavy as it may trigger acceptance for a series of blocks. let t = with_response(ApprovalCheckResult::Accepted); + tracing::trace!( + target: LOG_TARGET, + "Importing approval vote from validator {:?} on candidate {:?}", + (approval.validator, &pubkey), + (approved_candidate_hash, candidate_entry.candidate_receipt().descriptor.para_id), + ); + let actions = import_checked_approval( state, Some((approval.block_hash, block_entry)), @@ -976,6 +1060,13 @@ fn check_and_apply_full_approval( ); if now_approved { + tracing::trace!( + target: LOG_TARGET, + "Candidate approved {} under block {}", + candidate_hash, + block_hash, + ); + newly_approved.push(*block_hash); block_entry.mark_approved_by_hash(&candidate_hash); @@ -1072,6 +1163,14 @@ fn process_wakeup( let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot()); + tracing::debug!( + target: LOG_TARGET, + "Processing wakeup at tranche {} for candidate {} under block {}", + tranche_now, + candidate_hash, + relay_block, + ); + let (should_trigger, backing_group) = { let approval_entry = match candidate_entry.approval_entry(&relay_block) { Some(e) => e, @@ -1123,6 +1222,13 @@ fn process_wakeup( .position(|(_, h)| &candidate_hash == h); if let Some(i) = index_in_candidate { + tracing::debug!( + target: LOG_TARGET, + "Launching approval work for candidate {:?} in block {}", + (&candidate_hash, candidate_entry.candidate_receipt().descriptor.para_id), + relay_block, + ); + // sanity: should always be present. actions.push(Action::LaunchApproval { indirect_cert, @@ -1173,6 +1279,14 @@ async fn launch_approval( let (code_tx, code_rx) = oneshot::channel(); let (context_num_tx, context_num_rx) = oneshot::channel(); + let candidate_hash = candidate.hash(); + + tracing::debug!( + target: LOG_TARGET, + "Recovering data for candidate {:?}", + (candidate_hash, candidate.descriptor.para_id), + ); + ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData( candidate.clone(), session_index, @@ -1208,10 +1322,21 @@ async fn launch_approval( Err(_) => return, Ok(Ok(a)) => a, Ok(Err(RecoveryError::Unavailable)) => { + tracing::warn!( + target: LOG_TARGET, + "Data unavailable for candidate {:?}", + (candidate_hash, candidate.descriptor.para_id), + ); // do nothing. we'll just be a no-show and that'll cause others to rise up. return; } Ok(Err(RecoveryError::Invalid)) => { + tracing::warn!( + target: LOG_TARGET, + "Data recovery invalid for candidate {:?}", + (candidate_hash, candidate.descriptor.para_id), + ); + // TODO: dispute. Either the merkle trie is bad or the erasure root is. // https://github.com/paritytech/polkadot/issues/2176 return; @@ -1238,6 +1363,7 @@ async fn launch_approval( let (val_tx, val_rx) = oneshot::channel(); + let para_id = candidate.descriptor.para_id; let _ = background_tx.send(BackgroundRequest::CandidateValidation( available_data.validation_data, validation_code, @@ -1252,6 +1378,12 @@ async fn launch_approval( // Validation checked out. Issue an approval command. If the underlying service is unreachable, // then there isn't anything we can do. + tracing::debug!( + target: LOG_TARGET, + "Candidate Valid {:?}", + (candidate_hash, para_id), + ); + let _ = background_tx.send(BackgroundRequest::ApprovalVote(ApprovalVoteRequest { validator_index, block_hash, @@ -1259,6 +1391,12 @@ async fn launch_approval( })).await; } Ok(Ok(ValidationResult::Invalid(_))) => { + tracing::warn!( + target: LOG_TARGET, + "Detected invalid candidate as an approval checker {:?}", + (candidate_hash, para_id), + ); + // TODO: issue dispute, but not for timeouts. // https://github.com/paritytech/polkadot/issues/2176 } @@ -1358,6 +1496,12 @@ async fn issue_approval( } }; + tracing::debug!( + target: LOG_TARGET, + "Issuing approval vote for candidate {:?}", + candidate_hash, + ); + let actions = import_checked_approval( state, Some((block_hash, block_entry)), diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 7c8c9f3d94fe388397605ed6c808d488e7d8e50a..7afe0a1a27f4596125e6976e1c017fdbfcd65627 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -37,7 +37,7 @@ fn slot_to_tick(t: impl Into<Slot>) -> crate::time::Tick { crate::time::slot_number_to_tick(SLOT_DURATION_MILLIS, t.into()) } -#[derive(Default)] +#[derive(Default, Clone)] struct MockClock { inner: Arc<Mutex<MockClockInner>>, } @@ -1209,7 +1209,7 @@ fn assignment_not_triggered_if_at_maximum_but_clock_is_before_with_drift() { } #[test] -fn wakeups_drain() { +fn wakeups_next() { let mut wakeups = Wakeups::default(); let b_a = Hash::repeat_byte(0); @@ -1224,12 +1224,24 @@ fn wakeups_drain() { assert_eq!(wakeups.first().unwrap(), 1); - assert_eq!( - wakeups.drain(..=3).collect::<Vec<_>>(), - vec![(b_a, c_a), (b_b, c_b)], - ); + let clock = MockClock::new(0); + let clock_aux = clock.clone(); + + let test_fut = Box::pin(async move { + assert_eq!(wakeups.next(&clock).await, (b_a, c_a)); + assert_eq!(wakeups.next(&clock).await, (b_b, c_b)); + assert_eq!(wakeups.next(&clock).await, (b_a, c_b)); + assert!(wakeups.first().is_none()); + assert!(wakeups.wakeups.is_empty()); + }); + + let aux_fut = Box::pin(async move { + clock_aux.inner.lock().set_tick(1); + // skip direct set to 3. + clock_aux.inner.lock().set_tick(4); + }); - assert_eq!(wakeups.first().unwrap(), 4); + futures::executor::block_on(futures::future::join(test_fut, aux_fut)); } #[test] @@ -1243,14 +1255,20 @@ fn wakeup_earlier_supersedes_later() { wakeups.schedule(b_a, c_a, 2); wakeups.schedule(b_a, c_a, 3); - assert_eq!(wakeups.first().unwrap(), 2); + let clock = MockClock::new(0); + let clock_aux = clock.clone(); - assert_eq!( - wakeups.drain(..=2).collect::<Vec<_>>(), - vec![(b_a, c_a)], - ); + let test_fut = Box::pin(async move { + assert_eq!(wakeups.next(&clock).await, (b_a, c_a)); + assert!(wakeups.first().is_none()); + assert!(wakeups.reverse_wakeups.is_empty()); + }); + + let aux_fut = Box::pin(async move { + clock_aux.inner.lock().set_tick(2); + }); - assert!(wakeups.first().is_none()); + futures::executor::block_on(futures::future::join(test_fut, aux_fut)); } #[test] @@ -1517,7 +1535,7 @@ fn approved_ancestor_all_approved() { ); }); - futures::executor::block_on(futures::future::select(test_fut, aux_fut)); + futures::executor::block_on(futures::future::join(test_fut, aux_fut)); } #[test] @@ -1599,7 +1617,7 @@ fn approved_ancestor_missing_approval() { ); }); - futures::executor::block_on(futures::future::select(test_fut, aux_fut)); + futures::executor::block_on(futures::future::join(test_fut, aux_fut)); } #[test] diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index ded52bb9448abc385ffbf4befc95bb84981eb6c7..c22e702effbf08fe36b076d144d260c99536adab 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -1104,6 +1104,13 @@ fn store_available_data( write_available_data(&mut tx, &candidate_hash, &available_data); subsystem.db.write(tx)?; + + tracing::debug!( + target: LOG_TARGET, + "Stored data and chunks for candidate={}", + candidate_hash, + ); + Ok(()) } diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index bd01d66cb47679ca66b0423679f7ba4dcaf20ff2..91acd69dfa48839681b66954decd4a686343ca6e 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -71,13 +71,12 @@ pub enum Error { /// for whether we have the availability chunk for our validator index. #[tracing::instrument(level = "trace", skip(sender, span), fields(subsystem = LOG_TARGET))] async fn get_core_availability( - relay_parent: Hash, - core: CoreState, + core: &CoreState, validator_idx: ValidatorIndex, sender: &Mutex<&mut mpsc::Sender<FromJobCommand>>, span: &jaeger::Span, ) -> Result<bool, Error> { - if let CoreState::Occupied(core) = core { + if let &CoreState::Occupied(ref core) = core { let _span = span.child("query-chunk-availability"); let (tx, rx) = oneshot::channel(); @@ -152,10 +151,17 @@ async fn construct_availability_bitfield( // Handle all cores concurrently // `try_join_all` returns all results in the same order as the input futures. let results = future::try_join_all( - availability_cores.into_iter() - .map(|core| get_core_availability(relay_parent, core, validator_idx, &sender, span)), + availability_cores.iter() + .map(|core| get_core_availability(core, validator_idx, &sender, span)), ).await?; + tracing::debug!( + target: LOG_TARGET, + "Signing Bitfield for {} cores: {:?}", + availability_cores.len(), + results, + ); + Ok(AvailabilityBitfield(FromIterator::from_iter(results))) } diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index da7f040699613f89f97fb331dda6a2b6dd1033b0..2105df853eab7867d6b054ac652e65c634957ef5 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -377,7 +377,7 @@ async fn select_candidates( } } } - _ => continue, + CoreState::Free => continue, }; let validation_data = match request_persisted_validation_data( @@ -401,7 +401,16 @@ async fn select_candidates( descriptor.para_id == scheduled_core.para_id && descriptor.persisted_validation_data_hash == computed_validation_data_hash }) { - selected_candidates.push(candidate.hash()); + let candidate_hash = candidate.hash(); + tracing::trace!( + target: LOG_TARGET, + "Selecting candidate {}. para_id={} core={}", + candidate_hash, + candidate.descriptor.para_id, + core_idx, + ); + + selected_candidates.push(candidate_hash); } } @@ -444,6 +453,13 @@ async fn select_candidates( true }); + tracing::debug!( + target: LOG_TARGET, + "Selected {} candidates for {} cores", + candidates.len(), + availability_cores.len(), + ); + Ok(candidates) } diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 1c53b74d266293f7e14cdcfaad6fe5bac469f31e..42de2eceeab6bb275c88aa2285638670ea6faddf 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -70,6 +70,14 @@ struct State { blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>, blocks: HashMap<Hash, BlockEntry>, + /// Our view updates to our peers can race with `NewBlocks` updates. We store messages received + /// against the directly mentioned blocks in our view in this map until `NewBlocks` is received. + /// + /// As long as the parent is already in the `blocks` map and `NewBlocks` messages aren't delayed + /// by more than a block length, this strategy will work well for mitigating the race. This is + /// also a race that occurs typically on local networks. + pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage)>>, + /// Peer view data is partially stored here, and partially inline within the [`BlockEntry`]s peer_views: HashMap<PeerId, View>, } @@ -129,6 +137,11 @@ impl MessageSource { } } +enum PendingMessage { + Assignment(IndirectAssignmentCert, CandidateIndex), + Approval(IndirectSignedApprovalVote), +} + impl State { async fn handle_network_msg( &mut self, @@ -150,8 +163,14 @@ impl State { NetworkBridgeEvent::PeerViewChange(peer_id, view) => { self.handle_peer_view_change(ctx, peer_id, view).await; } - NetworkBridgeEvent::OurViewChange(_view) => { - // handled by `BlockFinalized` notification + NetworkBridgeEvent::OurViewChange(view) => { + for head in &view.heads { + if !self.blocks.contains_key(head) { + self.pending_known.entry(*head).or_default(); + } + } + + self.pending_known.retain(|h, _| view.contains(h)); } NetworkBridgeEvent::PeerMessage(peer_id, msg) => { self.process_incoming_peer_message(ctx, metrics, peer_id, msg).await; @@ -162,10 +181,11 @@ impl State { async fn handle_new_blocks( &mut self, ctx: &mut impl SubsystemContext<Message = ApprovalDistributionMessage>, + metrics: &Metrics, metas: Vec<BlockApprovalMeta>, ) { let mut new_hashes = HashSet::new(); - for meta in metas.into_iter() { + for meta in &metas { match self.blocks.entry(meta.hash.clone()) { hash_map::Entry::Vacant(entry) => { let candidates_count = meta.candidates.len(); @@ -185,6 +205,47 @@ impl State { } self.blocks_by_number.entry(meta.number).or_default().push(meta.hash); } + + tracing::debug!( + target: LOG_TARGET, + "Got new blocks {:?}", + metas.iter().map(|m| (m.hash, m.number)).collect::<Vec<_>>(), + ); + + { + let pending_now_known = self.pending_known.keys() + .filter(|k| self.blocks.contains_key(k)) + .copied() + .collect::<Vec<_>>(); + + let to_import = pending_now_known.into_iter() + .filter_map(|k| self.pending_known.remove(&k)) + .flatten() + .collect::<Vec<_>>(); + + for (peer_id, message) in to_import { + match message { + PendingMessage::Assignment(assignment, claimed_index) => { + self.import_and_circulate_assignment( + ctx, + metrics, + MessageSource::Peer(peer_id), + assignment, + claimed_index, + ).await; + } + PendingMessage::Approval(approval_vote) => { + self.import_and_circulate_approval( + ctx, + metrics, + MessageSource::Peer(peer_id), + approval_vote, + ).await; + } + } + } + } + for (peer_id, view) in self.peer_views.iter() { let intersection = view.iter().filter(|h| new_hashes.contains(h)); let view_intersection = View::new( @@ -216,6 +277,15 @@ impl State { "Processing assignments from a peer", ); for (assignment, claimed_index) in assignments.into_iter() { + if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) { + pending.push(( + peer_id.clone(), + PendingMessage::Assignment(assignment, claimed_index), + )); + + continue; + } + self.import_and_circulate_assignment( ctx, metrics, @@ -233,6 +303,15 @@ impl State { "Processing approvals from a peer", ); for approval_vote in approvals.into_iter() { + if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) { + pending.push(( + peer_id.clone(), + PendingMessage::Approval(approval_vote), + )); + + continue; + } + self.import_and_circulate_approval( ctx, metrics, @@ -446,6 +525,14 @@ impl State { } if !peers.is_empty() { + tracing::trace!( + target: LOG_TARGET, + "Sending assignment (block={}, index={})to {} peers", + block_hash, + claimed_candidate_index, + peers.len(), + ); + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( peers, protocol_v1::ValidationProtocol::ApprovalDistribution( @@ -616,6 +703,14 @@ impl State { let approvals = vec![vote]; if !peers.is_empty() { + tracing::trace!( + target: LOG_TARGET, + "Sending approval (block={}, index={})to {} peers", + block_hash, + candidate_index, + peers.len(), + ); + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( peers, protocol_v1::ValidationProtocol::ApprovalDistribution( @@ -681,6 +776,14 @@ impl State { Some(entry) => entry, None => continue, // should be unreachable }; + + tracing::trace!( + target: LOG_TARGET, + "Sending all assignments and approvals in block {} to peer {}", + block, + peer_id, + ); + for (candidate_index, candidate_entry) in entry.candidates.iter().enumerate() { let candidate_index = candidate_index as u32; for (validator_index, approval_state) in candidate_entry.approvals.iter() { @@ -785,12 +888,18 @@ impl ApprovalDistribution { msg: ApprovalDistributionMessage::NewBlocks(metas), } => { tracing::debug!(target: LOG_TARGET, "Processing NewBlocks"); - state.handle_new_blocks(&mut ctx, metas).await; + state.handle_new_blocks(&mut ctx, &self.metrics, metas).await; } FromOverseer::Communication { msg: ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index), } => { - tracing::debug!(target: LOG_TARGET, "Processing DistributeAssignment"); + tracing::debug!( + target: LOG_TARGET, + "Distributing our assignment on candidate (block={}, index={})", + cert.block_hash, + candidate_index, + ); + state.import_and_circulate_assignment( &mut ctx, &self.metrics, @@ -802,7 +911,13 @@ impl ApprovalDistribution { FromOverseer::Communication { msg: ApprovalDistributionMessage::DistributeApproval(vote), } => { - tracing::debug!(target: LOG_TARGET, "Processing DistributeApproval"); + tracing::debug!( + target: LOG_TARGET, + "Distributing our approval vote on candidate (block={}, index={})", + vote.block_hash, + vote.candidate_index, + ); + state.import_and_circulate_approval( &mut ctx, &self.metrics, diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index db57f18643d40a232b2a41f92784009ab6b86693..b1d7235b8dc71a65c75712992fa197a6f3cc023b 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -846,6 +846,15 @@ async fn handle_network_update( // message. let chunk = query_chunk(ctx, candidate_hash, validator_index).await?; + tracing::trace!( + target: LOG_TARGET, + "Responding({}) to chunk request req_id={} candidate={} index={}", + chunk.is_some(), + request_id, + candidate_hash, + validator_index, + ); + // Whatever the result, issue an // AvailabilityRecoveryV1Message::Chunk(r_id, response) message. let wire_message = protocol_v1::AvailabilityRecoveryMessage::Chunk( @@ -867,6 +876,15 @@ async fn handle_network_update( report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await; } Some((peer_id, Awaited::Chunk(awaited_chunk))) if peer_id == peer => { + tracing::trace!( + target: LOG_TARGET, + "Received chunk response({}) req_id={} candidate={} index={}", + chunk.is_some(), + request_id, + awaited_chunk.candidate_hash, + awaited_chunk.validator_index, + ); + // If there exists an entry under r_id, remove it. // Send the chunk response on the awaited_chunk for the interaction to handle. if let Some(chunk) = chunk { @@ -897,6 +915,14 @@ async fn handle_network_update( // message. let full_data = query_full_data(ctx, candidate_hash).await?; + tracing::trace!( + target: LOG_TARGET, + "Responding({}) to full data request req_id={} candidate={}", + full_data.is_some(), + request_id, + candidate_hash, + ); + // Whatever the result, issue an // AvailabilityRecoveryV1Message::FullData(r_id, response) message. let wire_message = protocol_v1::AvailabilityRecoveryMessage::FullData( @@ -918,6 +944,14 @@ async fn handle_network_update( report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await; } Some((peer_id, Awaited::FullData(awaited))) if peer_id == peer => { + tracing::trace!( + target: LOG_TARGET, + "Received full data response({}) req_id={} candidate={}", + data.is_some(), + request_id, + awaited.candidate_hash, + ); + // If there exists an entry under r_id, remove it. // Send the response on the awaited for the interaction to handle. if let Some(data) = data { @@ -962,17 +996,41 @@ async fn issue_request( state.next_request_id += 1; let wire_message = match awaited { - Awaited::Chunk(ref awaited_chunk) => protocol_v1::AvailabilityRecoveryMessage::RequestChunk( - request_id, - awaited_chunk.candidate_hash, - awaited_chunk.validator_index, - ), - Awaited::FullData(ref awaited_data) => protocol_v1::AvailabilityRecoveryMessage::RequestFullData( - request_id, - awaited_data.candidate_hash, - ), + Awaited::Chunk(ref awaited_chunk) => { + tracing::trace!( + target: LOG_TARGET, + "Requesting chunk req_id={} peer_id={} candidate={} index={}", + request_id, + peer_id, + awaited_chunk.candidate_hash, + awaited_chunk.validator_index, + ); + + protocol_v1::AvailabilityRecoveryMessage::RequestChunk( + request_id, + awaited_chunk.candidate_hash, + awaited_chunk.validator_index, + ) + } + Awaited::FullData(ref awaited_data) => { + tracing::trace!( + target: LOG_TARGET, + "Requesting full data req_id={} peer_id={} candidate={} index={}", + request_id, + peer_id, + awaited_data.candidate_hash, + awaited_data.validator_index, + ); + + protocol_v1::AvailabilityRecoveryMessage::RequestFullData( + request_id, + awaited_data.candidate_hash, + ) + } }; + + ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage( vec![peer_id.clone()], diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index e1dfa8fb5f9d8c924444579e97bce797ac6707cc..794721a70beec8f31969d753955ebc8ffc159eae 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -31,6 +31,7 @@ use polkadot_subsystem::messages::{ NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage, BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage, CollatorProtocolMessage, ApprovalDistributionMessage, NetworkBridgeEvent, + AvailabilityRecoveryMessage, }; use polkadot_primitives::v1::{Hash, BlockNumber}; use polkadot_node_network_protocol::{ @@ -565,7 +566,7 @@ async fn dispatch_validation_events_to_all<I>( I::IntoIter: Send, { let messages_for = |event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>| { - let a = std::iter::once(event.focus().ok().map(|m| AllMessages::AvailabilityDistribution( + let av_d = std::iter::once(event.focus().ok().map(|m| AllMessages::AvailabilityDistribution( AvailabilityDistributionMessage::NetworkBridgeUpdateV1(m) ))); @@ -585,7 +586,11 @@ async fn dispatch_validation_events_to_all<I>( ApprovalDistributionMessage::NetworkBridgeUpdateV1(m) ))); - a.chain(b).chain(p).chain(s).chain(ap).filter_map(|x| x) + let av_r = std::iter::once(event.focus().ok().map(|m| AllMessages::AvailabilityRecovery( + AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(m) + ))); + + av_d.chain(b).chain(p).chain(s).chain(ap).chain(av_r).filter_map(|x| x) }; ctx.send_messages(events.into_iter().flat_map(messages_for)).await @@ -847,6 +852,13 @@ mod tests { ApprovalDistributionMessage::NetworkBridgeUpdateV1(e) ) if e == event.focus().expect("could not focus message") ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityRecovery( + AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(e) + ) if e == event.focus().expect("could not focus message") + ); } async fn assert_sends_collation_event_to_all( diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index 107e3079f36d64e7901016bd11e608586faf3d03..689fc60d09652bb43a44c1a067653bc43cae5ed2 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -481,6 +481,7 @@ pub mod v1 { impl_try_from!(ValidationProtocol, PoVDistribution, PoVDistributionMessage); impl_try_from!(ValidationProtocol, StatementDistribution, StatementDistributionMessage); impl_try_from!(ValidationProtocol, ApprovalDistribution, ApprovalDistributionMessage); + impl_try_from!(ValidationProtocol, AvailabilityRecovery, AvailabilityRecoveryMessage); /// All network messages on the collation peer-set. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] diff --git a/polkadot/node/service/src/chain_spec.rs b/polkadot/node/service/src/chain_spec.rs index b099cd5d369b95088d1872dcfbfe66a64633b3fd..58edd063515209656ef3e4357e8cc9007b24f373 100644 --- a/polkadot/node/service/src/chain_spec.rs +++ b/polkadot/node/service/src/chain_spec.rs @@ -892,6 +892,7 @@ fn rococo_staging_testnet_config_genesis(wasm_binary: &[u8]) -> rococo_runtime:: hrmp_max_parachain_outbound_channels: 4, hrmp_max_parathread_outbound_channels: 4, hrmp_max_message_num_per_candidate: 5, + dispute_period: 6, no_show_slots: 2, n_delay_tranches: 25, needed_approvals: 2, @@ -1402,6 +1403,7 @@ pub fn rococo_testnet_genesis( hrmp_max_parachain_outbound_channels: 4, hrmp_max_parathread_outbound_channels: 4, hrmp_max_message_num_per_candidate: 5, + dispute_period: 6, no_show_slots: 2, n_delay_tranches: 25, needed_approvals: 2, diff --git a/polkadot/node/service/src/grandpa_support.rs b/polkadot/node/service/src/grandpa_support.rs index 6468565a7b3e568664c77d577ef0f4f7b99c5fff..fd685802690971d390435c9e324527c697a74844 100644 --- a/polkadot/node/service/src/grandpa_support.rs +++ b/polkadot/node/service/src/grandpa_support.rs @@ -79,7 +79,7 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic &self, backend: Arc<B>, base: &PolkadotHeader, - _best_target: &PolkadotHeader, + best_target: &PolkadotHeader, current_target: &PolkadotHeader, ) -> grandpa::VotingRuleResult<PolkadotBlock> { // always wait 50 blocks behind the head to finalize. @@ -109,8 +109,16 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic } }; + // delay blocks behind the head, but make sure we're not ahead of the current + // target. + let target_number = std::cmp::min( + best_target.number().saturating_sub(DIAGNOSTIC_GRANDPA_DELAY), + current_target.number().clone(), + ); + + // don't go below base let target_number = std::cmp::max( - current_target.number().saturating_sub(DIAGNOSTIC_GRANDPA_DELAY), + target_number, base.number().clone(), ); @@ -123,8 +131,8 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic let mut overseer = self.overseer.clone(); let checking_lag = self.checking_lag.clone(); - let current_hash = current_target.hash(); - let current_number = current_target.number.clone(); + let best_hash = best_target.hash(); + let best_number = best_target.number.clone(); let base_number = base.number; @@ -132,7 +140,7 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic let (tx, rx) = oneshot::channel(); let approval_checking_subsystem_vote = { overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor( - current_hash, + best_hash, base_number, tx, )).await; @@ -141,14 +149,21 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic }; let approval_checking_subsystem_lag = approval_checking_subsystem_vote.map_or( - current_number - base_number, - |(_h, n)| current_number - n, + best_number - base_number, + |(_h, n)| best_number - n, ); if let Some(ref checking_lag) = checking_lag { checking_lag.observe(approval_checking_subsystem_lag as _); } + tracing::debug!( + target: "approval_voting", + "GRANDPA: voting on {:?}. Approval-checking lag behind best is {}", + actual_vote_target, + approval_checking_subsystem_lag, + ); + actual_vote_target }) } diff --git a/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md b/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md index 59f633e26aa37d9bb01201aa60c355ab3d6d1037..d735dd48cffc619865557a5609b446e5f87a597d 100644 --- a/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/approval/approval-distribution.md @@ -42,6 +42,11 @@ Output: ```rust type BlockScopedCandidate = (Hash, CandidateHash); +enum PendingMessage { + Assignment(IndirectAssignmentCert, CoreIndex), + Approval(IndirectSignedApprovalVote), +} + /// The `State` struct is responsible for tracking the overall state of the subsystem. /// /// It tracks metadata about our view of the unfinalized chain, which assignments and approvals we have seen, and our peers' views. @@ -50,6 +55,14 @@ struct State { blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>, blocks: HashMap<Hash, BlockEntry>, + /// Our view updates to our peers can race with `NewBlocks` updates. We store messages received + /// against the directly mentioned blocks in our view in this map until `NewBlocks` is received. + /// + /// As long as the parent is already in the `blocks` map and `NewBlocks` messages aren't delayed + /// by more than a block length, this strategy will work well for mitigating the race. This is + /// also a race that occurs typically on local networks. + pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage>)>>, + // Peer view data is partially stored here, and partially inline within the `BlockEntry`s peer_views: HashMap<PeerId, View>, } @@ -102,6 +115,11 @@ Remove the view under the associated `PeerId` from `State::peer_views`. Iterate over every `BlockEntry` and remove `PeerId` from it. +#### `NetworkBridgeEvent::OurViewChange` + +Remove entries in `pending_known` for all hashes not present in the view. +Ensure a vector is present in `pending_known` for each hash in the view that does not have an entry in `blocks`. + #### `NetworkBridgeEvent::PeerViewChange` Invoke `unify_with_peer(peer, view)` to catch them up to messages we have. @@ -116,6 +134,8 @@ From there, we can loop backwards from `constrain(view.finalized_number)` until #### `NetworkBridgeEvent::PeerMessage` +If the block hash referenced by the message exists in `pending_known`, add it to the vector of pending messages and return. + If the message is of type `ApprovalDistributionV1Message::Assignment(assignment_cert, claimed_index)`, then call `import_and_circulate_assignment(MessageSource::Peer(sender), assignment_cert, claimed_index)` If the message is of type `ApprovalDistributionV1Message::Approval(approval_vote)`, then call `import_and_circulate_approval(MessageSource::Peer(sender), approval_vote)` @@ -126,6 +146,9 @@ If the message is of type `ApprovalDistributionV1Message::Approval(approval_vote Create `BlockEntry` and `CandidateEntries` for all blocks. +For all entries in `pending_known`: + * If there is now an entry under `blocks` for the block hash, drain all messages and import with `import_and_circulate_assignment` and `import_and_circulate_approval`. + For all peers: * Compute `view_intersection` as the intersection of the peer's view blocks with the hashes of the new blocks. * Invoke `unify_with_peer(peer, view_intersection)`. @@ -157,8 +180,8 @@ enum MessageSource { Imports an assignment cert referenced by block hash and candidate index. As a postcondition, if the cert is valid, it will have distributed the cert to all peers who have the block in their view, with the exclusion of the peer referenced by the `MessageSource`. We maintain a few invariants: - * we only send an assignment to a peer after we add its fingerpring to our knownledge - * we add a fingerprint of an assignment to our knownledge only if it's valid and hasn't been added before + * we only send an assignment to a peer after we add its fingerprint to our knowledge + * we add a fingerprint of an assignment to our knowledge only if it's valid and hasn't been added before The algorithm is the following: @@ -167,7 +190,7 @@ The algorithm is the following: * If the source is `MessageSource::Peer(sender)`: * check if `peer` appears under `known_by` and whether the fingerprint is in the `known_messages` of the peer. If the peer does not know the block, report for providing data out-of-view and proceed. If the peer does know the block and the knowledge contains the fingerprint, report for providing replicate data and return. * If the message fingerprint appears under the `BlockEntry`'s `Knowledge`, give the peer a small positive reputation boost, - add the fingerpring to the peer's knownledge only if it knows about the block and return. + add the fingerprint to the peer's knowledge only if it knows about the block and return. Note that we must do this after checking for out-of-view and if the peers knows about the block to avoid being spammed. If we did this check earlier, a peer could provide data out-of-view repeatedly and be rewarded for it. * Dispatch `ApprovalVotingMessage::CheckAndImportAssignment(assignment)` and wait for the response. @@ -194,7 +217,7 @@ Imports an approval signature referenced by block hash and candidate index: * If the source is `MessageSource::Peer(sender)`: * check if `peer` appears under `known_by` and whether the fingerprint is in the `known_messages` of the peer. If the peer does not know the block, report for providing data out-of-view and proceed. If the peer does know the block and the knowledge contains the fingerprint, report for providing replicate data and return. * If the message fingerprint appears under the `BlockEntry`'s `Knowledge`, give the peer a small positive reputation boost, - add the fingerpring to the peer's knownledge only if it knows about the block and return. + add the fingerprint to the peer's knowledge only if it knows about the block and return. Note that we must do this after checking for out-of-view to avoid being spammed. If we did this check earlier, a peer could provide data out-of-view repeatedly and be rewarded for it. * Dispatch `ApprovalVotingMessage::CheckAndImportApproval(approval)` and wait for the response. * If the result is `VoteCheckResult::Accepted(())`: diff --git a/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md b/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md index 0ff0db72dc40809bed6a3883449195410b347965..b2b9a243b2a716dcdedf926c29aa721f1541b318 100644 --- a/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md +++ b/polkadot/roadmap/implementers-guide/src/node/approval/approval-voting.md @@ -135,7 +135,10 @@ struct State { session_info: Vec<SessionInfo>, babe_epoch: Option<BabeEpoch>, // information about a cached BABE epoch. keystore: KeyStorePtr, - wakeups: BTreeMap<Tick, Vec<(Hash, Hash)>>, // Tick -> [(Relay Block, Candidate Hash)] + + // A scheduler which keeps at most one wakeup per hash, candidate hash pair and + // maps such pairs to `Tick`s. + wakeups: Wakeups, // These are connected to each other. background_tx: mpsc::Sender<BackgroundRequest>, diff --git a/polkadot/roadmap/implementers-guide/src/runtime/inclusion.md b/polkadot/roadmap/implementers-guide/src/runtime/inclusion.md index a87bca1314e434854253e06387192ab7086955af..e5ea1abdb64fcdac3d0e8a8b157926618413f089 100644 --- a/polkadot/roadmap/implementers-guide/src/runtime/inclusion.md +++ b/polkadot/roadmap/implementers-guide/src/runtime/inclusion.md @@ -48,8 +48,8 @@ Validators: Vec<ValidatorId>; All failed checks should lead to an unrecoverable error making the block invalid. -* `process_bitfields(Bitfields, core_lookup: Fn(CoreIndex) -> Option<ParaId>)`: - 1. check that the number of bitfields and bits in each bitfield is correct. +* `process_bitfields(expected_bits, Bitfields, core_lookup: Fn(CoreIndex) -> Option<ParaId>)`: + 1. check that there is at most 1 bitfield per validator and that the number of bits in each bitfield is equal to expected_bits. 1. check that there are no duplicates 1. check all validator signatures. 1. apply each bit of bitfield to the corresponding pending candidate. looking up parathread cores using the `core_lookup`. Disregard bitfields that have a `1` bit for any free cores. diff --git a/polkadot/roadmap/implementers-guide/src/runtime/parainherent.md b/polkadot/roadmap/implementers-guide/src/runtime/parainherent.md index 847c1090c008de2c62e06bc58aec80e5cfd89dbb..ed4d6fe7d90b25b97e338cfddd197f3964c23c32 100644 --- a/polkadot/roadmap/implementers-guide/src/runtime/parainherent.md +++ b/polkadot/roadmap/implementers-guide/src/runtime/parainherent.md @@ -27,7 +27,7 @@ Included: Option<()>, 1. Invoke `Disputes::provide_multi_dispute_data`. 1. If `Disputes::is_frozen`, return and set `Included` to `Some(())`. 1. If there are any created disputes from the current session, invoke `Inclusion::collect_disputed` with the disputed candidates. Annotate each returned core with `FreedReason::Concluded`. - 1. The `Bitfields` are first forwarded to the `Inclusion::process_bitfields` routine, returning a set of freed cores. Provide a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. Annotate each of these freed cores with `FreedReason::Concluded`. + 1. The `Bitfields` are first forwarded to the `Inclusion::process_bitfields` routine, returning a set of freed cores. Provide the number of availability cores (`Scheduler::availability_cores().len()`) as the expected number of bits and a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. Annotate each of these freed cores with `FreedReason::Concluded`. 1. For each freed candidate from the `Inclusion::process_bitfields` call, invoke `Disputes::note_included(current_session, candidate)`. 1. If `Scheduler::availability_timeout_predicate` is `Some`, invoke `Inclusion::collect_pending` using it and annotate each of those freed cores with `FreedReason::TimedOut`. 1. Combine and sort the dispute-freed cores, the bitfield-freed cores, and the timed-out cores. diff --git a/polkadot/runtime/kusama/src/lib.rs b/polkadot/runtime/kusama/src/lib.rs index b0ecbf1a135a9129ad5f54cf8536425cc87be4b8..639d6a9a9dda5ea832b73352308b4dd9c5decc20 100644 --- a/polkadot/runtime/kusama/src/lib.rs +++ b/polkadot/runtime/kusama/src/lib.rs @@ -1240,7 +1240,7 @@ sp_api::impl_runtime_apis! { c: PRIMARY_PROBABILITY, genesis_authorities: Babe::authorities(), randomness: Babe::randomness(), - allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryPlainSlots, + allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryVRFSlots, } } diff --git a/polkadot/runtime/parachains/src/inclusion.rs b/polkadot/runtime/parachains/src/inclusion.rs index 267cf272197cccbbef46abab2936ce651da9abb1..efe7ab35abeab1abccc7c0cc529d0db3c48c58bd 100644 --- a/polkadot/runtime/parachains/src/inclusion.rs +++ b/polkadot/runtime/parachains/src/inclusion.rs @@ -238,17 +238,14 @@ impl<T: Config> Module<T> { /// Process a set of incoming bitfields. Return a vec of cores freed by candidates /// becoming available. pub(crate) fn process_bitfields( + expected_bits: usize, signed_bitfields: SignedAvailabilityBitfields, core_lookup: impl Fn(CoreIndex) -> Option<ParaId>, ) -> Result<Vec<CoreIndex>, DispatchError> { let validators = Validators::get(); let session_index = shared::Module::<T>::session_index(); - let config = <configuration::Module<T>>::config(); - let parachains = <paras::Module<T>>::parachains(); - - let n_bits = parachains.len() + config.parathread_cores as usize; - let mut assigned_paras_record: Vec<_> = (0..n_bits) + let mut assigned_paras_record: Vec<_> = (0..expected_bits) .map(|bit_index| core_lookup(CoreIndex::from(bit_index as u32))) .map(|core_para| core_para.map(|p| (p, PendingAvailability::<T>::get(&p)))) .collect(); @@ -256,7 +253,7 @@ impl<T: Config> Module<T> { // do sanity checks on the bitfields: // 1. no more than one bitfield per validator // 2. bitfields are ascending by validator index. - // 3. each bitfield has exactly `n_bits` + // 3. each bitfield has exactly `expected_bits` // 4. signature is valid. { let occupied_bitmask: BitVec<BitOrderLsb0, u8> = assigned_paras_record.iter() @@ -274,7 +271,7 @@ impl<T: Config> Module<T> { for signed_bitfield in &signed_bitfields { ensure!( - signed_bitfield.payload().0.len() == n_bits, + signed_bitfield.payload().0.len() == expected_bits, Error::<T>::WrongBitfieldSize, ); @@ -336,7 +333,7 @@ impl<T: Config> Module<T> { let threshold = availability_threshold(validators.len()); - let mut freed_cores = Vec::with_capacity(n_bits); + let mut freed_cores = Vec::with_capacity(expected_bits); for (para_id, pending_availability) in assigned_paras_record.into_iter() .filter_map(|x| x) .filter_map(|(id, p)| p.map(|p| (id, p))) @@ -1060,10 +1057,12 @@ mod tests { } } - fn default_bitfield() -> AvailabilityBitfield { - let n_bits = Paras::parachains().len() + Configuration::config().parathread_cores as usize; + fn expected_bits() -> usize { + Paras::parachains().len() + Configuration::config().parathread_cores as usize + } - AvailabilityBitfield(bitvec::bitvec![BitOrderLsb0, u8; 0; n_bits]) + fn default_bitfield() -> AvailabilityBitfield { + AvailabilityBitfield(bitvec::bitvec![BitOrderLsb0, u8; 0; expected_bits()]) } fn default_availability_votes() -> BitVec<BitOrderLsb0, u8> { @@ -1228,7 +1227,8 @@ mod tests { core if core == CoreIndex::from(0) => Some(chain_a), core if core == CoreIndex::from(1) => Some(chain_b), core if core == CoreIndex::from(2) => Some(thread_a), - _ => panic!("Core out of bounds for 2 parachains and 1 parathread core."), + core if core == CoreIndex::from(3) => None, // for the expected_cores() + 1 test below. + _ => panic!("out of bounds for testing"), }; // wrong number of bits. @@ -1244,6 +1244,25 @@ mod tests { )); assert!(Inclusion::process_bitfields( + expected_bits(), + vec![signed], + &core_lookup, + ).is_err()); + } + + // wrong number of bits: other way around. + { + let bare_bitfield = default_bitfield(); + let signed = block_on(sign_bitfield( + &keystore, + &validators[0], + 0, + bare_bitfield, + &signing_context, + )); + + assert!(Inclusion::process_bitfields( + expected_bits() + 1, vec![signed], &core_lookup, ).is_err()); @@ -1261,6 +1280,7 @@ mod tests { )); assert!(Inclusion::process_bitfields( + expected_bits(), vec![signed.clone(), signed], &core_lookup, ).is_err()); @@ -1286,6 +1306,7 @@ mod tests { )); assert!(Inclusion::process_bitfields( + expected_bits(), vec![signed_1, signed_0], &core_lookup, ).is_err()); @@ -1304,6 +1325,7 @@ mod tests { )); assert!(Inclusion::process_bitfields( + expected_bits(), vec![signed], &core_lookup, ).is_err()); @@ -1321,6 +1343,7 @@ mod tests { )); assert!(Inclusion::process_bitfields( + expected_bits(), vec![signed], &core_lookup, ).is_ok()); @@ -1355,6 +1378,7 @@ mod tests { )); assert!(Inclusion::process_bitfields( + expected_bits(), vec![signed], &core_lookup, ).is_ok()); @@ -1393,6 +1417,7 @@ mod tests { // no core is freed assert_eq!( Inclusion::process_bitfields( + expected_bits(), vec![signed], &core_lookup, ), @@ -1516,6 +1541,7 @@ mod tests { }).collect(); assert!(Inclusion::process_bitfields( + expected_bits(), signed_bitfields, &core_lookup, ).is_ok()); diff --git a/polkadot/runtime/parachains/src/inclusion_inherent.rs b/polkadot/runtime/parachains/src/inclusion_inherent.rs index 56516f8f76fdc7ff80c7ab895057ff6bff0bb258..e2ed15c42b2dccc2abcd4808ccc9c6eb99bcf856 100644 --- a/polkadot/runtime/parachains/src/inclusion_inherent.rs +++ b/polkadot/runtime/parachains/src/inclusion_inherent.rs @@ -107,7 +107,9 @@ decl_module! { // Process new availability bitfields, yielding any availability cores whose // work has now concluded. + let expected_bits = <scheduler::Module<T>>::availability_cores().len(); let freed_concluded = <inclusion::Module<T>>::process_bitfields( + expected_bits, signed_bitfields, <scheduler::Module<T>>::core_para, )?; diff --git a/polkadot/runtime/parachains/src/initializer.rs b/polkadot/runtime/parachains/src/initializer.rs index 8e437dc7c334c757395f74a6245f7187379e2e36..e0bd6205102082dd819bc7f767d3ef910fea901a 100644 --- a/polkadot/runtime/parachains/src/initializer.rs +++ b/polkadot/runtime/parachains/src/initializer.rs @@ -229,11 +229,17 @@ impl<T: Config> Module<T> { validators.clone() }; - BufferedSessionChanges::mutate(|v| v.push(BufferedSessionChange { - validators, - queued, - session_index, - })); + if session_index == 0 { + // Genesis session should be immediately enacted. + Self::apply_new_session(0, validators, queued); + } else { + BufferedSessionChanges::mutate(|v| v.push(BufferedSessionChange { + validators, + queued, + session_index, + })); + } + } } @@ -244,10 +250,10 @@ impl<T: Config> sp_runtime::BoundToRuntimeAppPublic for Module<T> { impl<T: pallet_session::Config + Config> OneSessionHandler<T::AccountId> for Module<T> { type Key = ValidatorId; - fn on_genesis_session<'a, I: 'a>(_validators: I) + fn on_genesis_session<'a, I: 'a>(validators: I) where I: Iterator<Item=(&'a T::AccountId, Self::Key)> { - + <Module<T>>::on_new_session(false, 0, validators, None); } fn on_new_session<'a, I: 'a>(changed: bool, validators: I, queued: I) @@ -266,7 +272,7 @@ mod tests { use primitives::v1::{Id as ParaId}; use crate::mock::{ new_test_ext, - Initializer, System, Dmp, Paras, Configuration, MockGenesisConfig, + Initializer, System, Dmp, Paras, Configuration, SessionInfo, MockGenesisConfig, }; use frame_support::{ @@ -274,6 +280,24 @@ mod tests { traits::{OnFinalize, OnInitialize}, }; + #[test] + fn session_0_is_instantly_applied() { + new_test_ext(Default::default()).execute_with(|| { + Initializer::on_new_session( + false, + 0, + Vec::new().into_iter(), + Some(Vec::new().into_iter()), + ); + + let v = <BufferedSessionChanges>::get(); + assert!(v.is_empty()); + + assert_eq!(SessionInfo::earliest_stored_session(), 0); + assert!(SessionInfo::session_info(0).is_some()); + }); + } + #[test] fn session_change_before_initialize_is_still_buffered_after() { new_test_ext(Default::default()).execute_with(|| { diff --git a/polkadot/runtime/rococo/src/lib.rs b/polkadot/runtime/rococo/src/lib.rs index b04cd72609e942dbfc5a19c8d1b69611adc5118a..547e08e8381bff88d64f6a3fdf899210b47ad582 100644 --- a/polkadot/runtime/rococo/src/lib.rs +++ b/polkadot/runtime/rococo/src/lib.rs @@ -807,7 +807,7 @@ sp_api::impl_runtime_apis! { c: PRIMARY_PROBABILITY, genesis_authorities: Babe::authorities(), randomness: Babe::randomness(), - allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryPlainSlots, + allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryVRFSlots, } } diff --git a/polkadot/runtime/test-runtime/src/lib.rs b/polkadot/runtime/test-runtime/src/lib.rs index c4a16bbc65824d9a3edcf81078213dbdd129612a..edb9ccbc0e6971b2820d2ae5e8b05845f1dc91db 100644 --- a/polkadot/runtime/test-runtime/src/lib.rs +++ b/polkadot/runtime/test-runtime/src/lib.rs @@ -467,7 +467,7 @@ impl parachains_inclusion::Config for Runtime { impl parachains_inclusion_inherent::Config for Runtime {} impl parachains_initializer::Config for Runtime { - type Randomness = RandomnessCollectiveFlip; + type Randomness = Babe; } impl parachains_session_info::Config for Runtime {} @@ -742,7 +742,7 @@ sp_api::impl_runtime_apis! { c: PRIMARY_PROBABILITY, genesis_authorities: Babe::authorities(), randomness: Babe::randomness(), - allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryPlainSlots, + allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryVRFSlots, } } diff --git a/polkadot/runtime/westend/src/lib.rs b/polkadot/runtime/westend/src/lib.rs index a6aaeff38ac077d5aca51430f64b3fbf5ff70922..c6f43a598aa6331de1b42a0660d7ddac55e43bbc 100644 --- a/polkadot/runtime/westend/src/lib.rs +++ b/polkadot/runtime/westend/src/lib.rs @@ -955,7 +955,7 @@ sp_api::impl_runtime_apis! { c: PRIMARY_PROBABILITY, genesis_authorities: Babe::authorities(), randomness: Babe::randomness(), - allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryPlainSlots, + allowed_slots: babe_primitives::AllowedSlots::PrimaryAndSecondaryVRFSlots, } }