Unverified Commit f0faf5be authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

av-store: use determine_new_blocks (#3356)

* av-store: use determine_new_blocks

* fix tests

* update the guide

* rename KnownBlocks

* fix iteration order

* add a test
parent 45eef833
Pipeline #144014 passed with stages
in 43 minutes and 51 seconds
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#![recursion_limit="256"] #![recursion_limit="256"]
#![warn(missing_docs)] #![warn(missing_docs)]
use std::collections::HashMap; use std::collections::{HashMap, HashSet, BTreeSet};
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH}; use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
...@@ -31,7 +31,7 @@ use kvdb::{KeyValueDB, DBTransaction}; ...@@ -31,7 +31,7 @@ use kvdb::{KeyValueDB, DBTransaction};
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
Hash, BlockNumber, CandidateEvent, ValidatorIndex, CandidateHash, Hash, BlockNumber, CandidateEvent, ValidatorIndex, CandidateHash,
CandidateReceipt, CandidateReceipt, Header,
}; };
use polkadot_node_primitives::{ use polkadot_node_primitives::{
ErasureChunk, AvailableData, ErasureChunk, AvailableData,
...@@ -41,7 +41,10 @@ use polkadot_subsystem::{ ...@@ -41,7 +41,10 @@ use polkadot_subsystem::{
ActiveLeavesUpdate, ActiveLeavesUpdate,
errors::{ChainApiError, RuntimeApiError}, errors::{ChainApiError, RuntimeApiError},
}; };
use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_node_subsystem_util::{
self as util,
metrics::{self, prometheus},
};
use polkadot_subsystem::messages::{ use polkadot_subsystem::messages::{
AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest, AvailabilityStoreMessage, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest,
}; };
...@@ -444,6 +447,8 @@ pub struct AvailabilityStoreSubsystem { ...@@ -444,6 +447,8 @@ pub struct AvailabilityStoreSubsystem {
pruning_config: PruningConfig, pruning_config: PruningConfig,
config: Config, config: Config,
db: Arc<dyn KeyValueDB>, db: Arc<dyn KeyValueDB>,
known_blocks: KnownUnfinalizedBlocks,
finalized_number: Option<BlockNumber>,
metrics: Metrics, metrics: Metrics,
clock: Box<dyn Clock>, clock: Box<dyn Clock>,
} }
...@@ -478,6 +483,41 @@ impl AvailabilityStoreSubsystem { ...@@ -478,6 +483,41 @@ impl AvailabilityStoreSubsystem {
db, db,
metrics, metrics,
clock, clock,
known_blocks: KnownUnfinalizedBlocks::default(),
finalized_number: None,
}
}
}
/// We keep the hashes and numbers of all unfinalized
/// processed blocks in memory.
#[derive(Default, Debug)]
struct KnownUnfinalizedBlocks {
by_hash: HashSet<Hash>,
by_number: BTreeSet<(BlockNumber, Hash)>,
}
impl KnownUnfinalizedBlocks {
/// Check whether the block has been already processed.
fn is_known(&self, hash: &Hash) -> bool {
self.by_hash.contains(hash)
}
/// Insert a new block into the known set.
fn insert(&mut self, hash: Hash, number: BlockNumber) {
self.by_hash.insert(hash);
self.by_number.insert((number, hash));
}
/// Prune all finalized blocks.
fn prune_finalized(&mut self, finalized: BlockNumber) {
// split_off returns everything after the given key, including the key
let split_point = finalized.saturating_add(1);
let mut finalized = self.by_number.split_off(&(split_point, Hash::zero()));
// after split_off `finalized` actually contains unfinalized blocks, we need to swap
std::mem::swap(&mut self.by_number, &mut finalized);
for (_, block) in finalized {
self.by_hash.remove(&block);
} }
} }
} }
...@@ -547,6 +587,8 @@ where ...@@ -547,6 +587,8 @@ where
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => { FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
let _timer = subsystem.metrics.time_process_block_finalized(); let _timer = subsystem.metrics.time_process_block_finalized();
subsystem.finalized_number = Some(number);
subsystem.known_blocks.prune_finalized(number);
process_block_finalized( process_block_finalized(
ctx, ctx,
&subsystem, &subsystem,
...@@ -580,19 +622,11 @@ async fn process_block_activated( ...@@ -580,19 +622,11 @@ async fn process_block_activated(
) -> Result<(), Error> { ) -> Result<(), Error> {
let now = subsystem.clock.now()?; let now = subsystem.clock.now()?;
let candidate_events = { let block_header = {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
ctx.send_message(
RuntimeApiMessage::Request(activated, RuntimeApiRequest::CandidateEvents(tx)).into()
).await;
rx.await??
};
let block_number = {
let (tx, rx) = oneshot::channel();
ctx.send_message( ctx.send_message(
ChainApiMessage::BlockNumber(activated, tx).into() ChainApiMessage::BlockHeader(activated, tx).into()
).await; ).await;
match rx.await?? { match rx.await?? {
...@@ -600,41 +634,77 @@ async fn process_block_activated( ...@@ -600,41 +634,77 @@ async fn process_block_activated(
Some(n) => n, Some(n) => n,
} }
}; };
let block_number = block_header.number;
let block_header = { let new_blocks = util::determine_new_blocks(
let (tx, rx) = oneshot::channel(); ctx.sender(),
|hash| -> Result<bool, Error> {
Ok(subsystem.known_blocks.is_known(hash))
},
activated,
&block_header,
subsystem.finalized_number.unwrap_or(block_number.saturating_sub(1)),
).await?;
let mut tx = DBTransaction::new();
// determine_new_blocks is descending in block height
for (hash, header) in new_blocks.into_iter().rev() {
process_new_head(
ctx,
&subsystem.db,
&mut tx,
&subsystem.config,
&subsystem.pruning_config,
now,
hash,
header,
).await?;
subsystem.known_blocks.insert(hash, block_number);
}
subsystem.db.write(tx)?;
Ok(())
}
async fn process_new_head(
ctx: &mut impl SubsystemContext,
db: &Arc<dyn KeyValueDB>,
db_transaction: &mut DBTransaction,
config: &Config,
pruning_config: &PruningConfig,
now: Duration,
hash: Hash,
header: Header,
) -> Result<(), Error> {
let candidate_events = {
let (tx, rx) = oneshot::channel();
ctx.send_message( ctx.send_message(
ChainApiMessage::BlockHeader(activated, tx).into() RuntimeApiMessage::Request(hash, RuntimeApiRequest::CandidateEvents(tx)).into()
).await; ).await;
match rx.await?? { rx.await??
None => return Ok(()),
Some(n) => n,
}
}; };
// We need to request the number of validators based on the parent state, as that is the number of validators // We need to request the number of validators based on the parent state,
// used to create this block. // as that is the number of validators used to create this block.
let n_validators = { let n_validators = {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
ctx.send_message( ctx.send_message(
RuntimeApiMessage::Request(block_header.parent_hash, RuntimeApiRequest::Validators(tx)).into() RuntimeApiMessage::Request(header.parent_hash, RuntimeApiRequest::Validators(tx)).into()
).await; ).await;
rx.await??.len() rx.await??.len()
}; };
let mut tx = DBTransaction::new();
for event in candidate_events { for event in candidate_events {
match event { match event {
CandidateEvent::CandidateBacked(receipt, _head, _core_index, _group_index) => { CandidateEvent::CandidateBacked(receipt, _head, _core_index, _group_index) => {
note_block_backed( note_block_backed(
&subsystem.db, db,
&mut tx, db_transaction,
&subsystem.config, config,
&subsystem.pruning_config, pruning_config,
now, now,
n_validators, n_validators,
receipt, receipt,
...@@ -642,11 +712,11 @@ async fn process_block_activated( ...@@ -642,11 +712,11 @@ async fn process_block_activated(
} }
CandidateEvent::CandidateIncluded(receipt, _head, _core_index, _group_index) => { CandidateEvent::CandidateIncluded(receipt, _head, _core_index, _group_index) => {
note_block_included( note_block_included(
&subsystem.db, db,
&mut tx, db_transaction,
&subsystem.config, config,
&subsystem.pruning_config, pruning_config,
(block_number, activated), (header.number, hash),
receipt, receipt,
)?; )?;
} }
...@@ -654,8 +724,6 @@ async fn process_block_activated( ...@@ -654,8 +724,6 @@ async fn process_block_activated(
} }
} }
subsystem.db.write(tx)?;
Ok(()) Ok(())
} }
...@@ -732,9 +800,10 @@ fn note_block_included( ...@@ -732,9 +800,10 @@ fn note_block_included(
State::Unfinalized(at, mut within) => { State::Unfinalized(at, mut within) => {
if let Err(i) = within.binary_search(&be_block) { if let Err(i) = within.binary_search(&be_block) {
within.insert(i, be_block); within.insert(i, be_block);
State::Unfinalized(at, within)
} else {
return Ok(());
} }
State::Unfinalized(at, within)
} }
State::Finalized(_at) => { State::Finalized(_at) => {
// This should never happen as a candidate would have to be included after // This should never happen as a candidate would have to be included after
......
...@@ -266,6 +266,25 @@ fn runtime_api_error_does_not_stop_the_subsystem() { ...@@ -266,6 +266,25 @@ fn runtime_api_error_does_not_stop_the_subsystem() {
}), }),
).await; ).await;
let header = Header {
parent_hash: Hash::zero(),
number: 1,
state_root: Hash::zero(),
extrinsics_root: Hash::zero(),
digest: Default::default(),
};
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
relay_parent,
tx,
)) => {
assert_eq!(relay_parent, new_leaf);
tx.send(Ok(Some(header))).unwrap();
}
);
// runtime api call fails // runtime api call fails
assert_matches!( assert_matches!(
overseer_recv(&mut virtual_overseer).await, overseer_recv(&mut virtual_overseer).await,
...@@ -765,6 +784,134 @@ fn stored_data_kept_until_finalized() { ...@@ -765,6 +784,134 @@ fn stored_data_kept_until_finalized() {
}); });
} }
#[test]
fn we_dont_miss_anything_if_import_notifications_are_missed() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::BlockFinalized(Hash::zero(), 1)
).await;
let header = Header {
parent_hash: Hash::repeat_byte(3),
number: 4,
state_root: Hash::zero(),
extrinsics_root: Hash::zero(),
digest: Default::default(),
};
let new_leaf = Hash::repeat_byte(4);
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: vec![ActivatedLeaf {
hash: new_leaf,
number: 4,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: vec![].into(),
}),
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
relay_parent,
tx,
)) => {
assert_eq!(relay_parent, new_leaf);
tx.send(Ok(Some(header))).unwrap();
}
);
let new_heads = vec![
(Hash::repeat_byte(2), Hash::repeat_byte(1)),
(Hash::repeat_byte(3), Hash::repeat_byte(2)),
(Hash::repeat_byte(4), Hash::repeat_byte(3)),
];
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::Ancestors {
hash,
k,
response_channel: tx,
}) => {
assert_eq!(hash, new_leaf);
assert_eq!(k, 2);
let _ = tx.send(Ok(vec![
Hash::repeat_byte(3),
Hash::repeat_byte(2),
]));
}
);
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
relay_parent,
tx,
)) => {
assert_eq!(relay_parent, Hash::repeat_byte(3));
tx.send(Ok(Some(Header {
parent_hash: Hash::repeat_byte(2),
number: 3,
state_root: Hash::zero(),
extrinsics_root: Hash::zero(),
digest: Default::default(),
}))).unwrap();
}
);
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
relay_parent,
tx,
)) => {
assert_eq!(relay_parent, Hash::repeat_byte(2));
tx.send(Ok(Some(Header {
parent_hash: Hash::repeat_byte(1),
number: 2,
state_root: Hash::zero(),
extrinsics_root: Hash::zero(),
digest: Default::default(),
}))).unwrap();
}
);
for (head, parent) in new_heads {
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
assert_eq!(relay_parent, head);
tx.send(Ok(Vec::new())).unwrap();
}
);
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(tx),
)) => {
assert_eq!(relay_parent, parent);
tx.send(Ok(Vec::new())).unwrap();
}
);
}
virtual_overseer
});
}
#[test] #[test]
fn forkfullness_works() { fn forkfullness_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
...@@ -1005,37 +1152,27 @@ async fn import_leaf( ...@@ -1005,37 +1152,27 @@ async fn import_leaf(
assert_matches!( assert_matches!(
overseer_recv(virtual_overseer).await, overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request( AllMessages::ChainApi(ChainApiMessage::BlockHeader(
relay_parent,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
assert_eq!(relay_parent, new_leaf);
tx.send(Ok(events)).unwrap();
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
relay_parent, relay_parent,
tx, tx,
)) => { )) => {
assert_eq!(relay_parent, new_leaf); assert_eq!(relay_parent, new_leaf);
tx.send(Ok(Some(block_number))).unwrap(); tx.send(Ok(Some(header))).unwrap();
} }
); );
assert_matches!( assert_matches!(
overseer_recv(virtual_overseer).await, overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockHeader( AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent, relay_parent,
tx, RuntimeApiRequest::CandidateEvents(tx),
)) => { )) => {
assert_eq!(relay_parent, new_leaf); assert_eq!(relay_parent, new_leaf);
tx.send(Ok(Some(header))).unwrap(); tx.send(Ok(events)).unwrap();
} }
); );
assert_matches!( assert_matches!(
overseer_recv(virtual_overseer).await, overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request( AllMessages::RuntimeApi(RuntimeApiMessage::Request(
......
...@@ -94,10 +94,10 @@ Output: ...@@ -94,10 +94,10 @@ Output:
## Functionality ## Functionality
For each head in the `activated` list: For each head in the `activated` list:
- Note any new candidates backed in the block. Update the `CandidateMeta` for each. If the `CandidateMeta` does not exist, create it as `Unavailable` with the current timestamp. Register a `"prune_by_time"` entry based on the current timestamp + 1 hour. - Load all ancestors of the head back to the finalized block so we don't miss anything if import notifications are missed. If a `StoreChunk` message is received for a candidate which has no entry, then we will prematurely lose the data.
- Note any new candidate included in the block. Update the `CandidateMeta` for each, performing a transition from `Unavailable` to `Unfinalized` if necessary. That includes removing the `"prune_by_time"` entry. Add the block hash and number to the state, if unfinalized. Add an `"unfinalized"` entry for the block and candidate. - Note any new candidates backed in the head. Update the `CandidateMeta` for each. If the `CandidateMeta` does not exist, create it as `Unavailable` with the current timestamp. Register a `"prune_by_time"` entry based on the current timestamp + 1 hour.
- Note any new candidate included in the head. Update the `CandidateMeta` for each, performing a transition from `Unavailable` to `Unfinalized` if necessary. That includes removing the `"prune_by_time"` entry. Add the head hash and number to the state, if unfinalized. Add an `"unfinalized"` entry for the block and candidate.
- The `CandidateEvent` runtime API can be used for this purpose. - The `CandidateEvent` runtime API can be used for this purpose.
- TODO: load all ancestors of the head back to the finalized block so we don't miss anything if import notifications are missed. If a `StoreChunk` message is received for a candidate which has no entry, then we will prematurely lose the data.
On `OverseerSignal::BlockFinalized(finalized)` events: On `OverseerSignal::BlockFinalized(finalized)` events:
- for each key in `iter_with_prefix("unfinalized")` - for each key in `iter_with_prefix("unfinalized")`
...@@ -110,7 +110,7 @@ On `OverseerSignal::BlockFinalized(finalized)` events: ...@@ -110,7 +110,7 @@ On `OverseerSignal::BlockFinalized(finalized)` events:
- For each candidate that we encounter under `f` which is not under the finalized block hash, - For each candidate that we encounter under `f` which is not under the finalized block hash,
- Remove all entries under `f` in the `Unfinalized` state. - Remove all entries under `f` in the `Unfinalized` state.
- If the `CandidateMeta` has state `Unfinalized` with an empty list of blocks, downgrade to `Unavailable` and re-schedule pruning under the timestamp + 1 hour. We do not prune here as the candidate still may be included in a descendent of the finalized chain. - If the `CandidateMeta` has state `Unfinalized` with an empty list of blocks, downgrade to `Unavailable` and re-schedule pruning under the timestamp + 1 hour. We do not prune here as the candidate still may be included in a descendent of the finalized chain.
- Remove all `"unfinalized"` keys under `f`. - Remove all `"unfinalized"` keys under `f`.
- Update last_finalized = finalized. - Update last_finalized = finalized.
This is roughly `O(n * m)` where n is the number of blocks finalized since the last update, and `m` is the number of parachains. This is roughly `O(n * m)` where n is the number of blocks finalized since the last update, and `m` is the number of parachains.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment