// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
use super::*;
use assert_matches::assert_matches;
use futures::{
future,
channel::oneshot,
executor,
Future,
};
use smallvec::smallvec;
use polkadot_primitives::v1::{
AvailableData, BlockData, CandidateDescriptor, CandidateReceipt, HeadData,
PersistedValidationData, PoV, Id as ParaId, CandidateHash,
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem::{
ActiveLeavesUpdate, errors::RuntimeApiError,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
struct TestHarness {
virtual_overseer: test_helpers::TestSubsystemContextHandle,
}
#[derive(Default)]
struct TestCandidateBuilder {
para_id: ParaId,
pov_hash: Hash,
relay_parent: Hash,
commitments_hash: Hash,
}
impl TestCandidateBuilder {
fn build(self) -> CandidateReceipt {
CandidateReceipt {
descriptor: CandidateDescriptor {
para_id: self.para_id,
pov_hash: self.pov_hash,
relay_parent: self.relay_parent,
..Default::default()
},
commitments_hash: self.commitments_hash,
}
}
}
struct TestState {
persisted_validation_data: PersistedValidationData,
pruning_config: PruningConfig,
}
impl Default for TestState {
fn default() -> Self {
let persisted_validation_data = PersistedValidationData {
parent_head: HeadData(vec![7, 8, 9]),
block_number: 5,
hrmp_mqc_heads: Vec::new(),
dmq_mqc_head: Default::default(),
};
let pruning_config = PruningConfig {
keep_stored_block_for: Duration::from_secs(1),
keep_finalized_block_for: Duration::from_secs(2),
keep_finalized_chunk_for: Duration::from_secs(2),
};
Self {
persisted_validation_data,
pruning_config,
}
}
}
fn test_harness>(
pruning_config: PruningConfig,
store: Arc,
test: impl FnOnce(TestHarness) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
Some("polkadot_node_core_av_store"),
log::LevelFilter::Trace,
)
.filter(
Some(LOG_TARGET),
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = AvailabilityStoreSubsystem::new_in_memory(store, pruning_config);
let subsystem = run(subsystem, context);
let test_fut = test(TestHarness {
virtual_overseer,
});
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::select(test_fut, subsystem));
}
const TIMEOUT: Duration = Duration::from_millis(100);
async fn overseer_send(
overseer: &mut test_helpers::TestSubsystemContextHandle,
msg: AvailabilityStoreMessage,
) {
log::trace!("Sending message:\n{:?}", &msg);
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
.await
.expect(&format!("{:?} is more than enough for sending messages.", TIMEOUT));
}
async fn overseer_recv(
overseer: &mut test_helpers::TestSubsystemContextHandle,
) -> AllMessages {
let msg = overseer_recv_with_timeout(overseer, TIMEOUT)
.await
.expect(&format!("{:?} is more than enough to receive messages", TIMEOUT));
log::trace!("Received message:\n{:?}", &msg);
msg
}
async fn overseer_recv_with_timeout(
overseer: &mut test_helpers::TestSubsystemContextHandle,
timeout: Duration,
) -> Option {
log::trace!("Waiting for message...");
overseer
.recv()
.timeout(timeout)
.await
}
async fn overseer_signal(
overseer: &mut test_helpers::TestSubsystemContextHandle,
signal: OverseerSignal,
) {
overseer
.send(FromOverseer::Signal(signal))
.timeout(TIMEOUT)
.await
.expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT));
}
#[test]
fn runtime_api_error_does_not_stop_the_subsystem() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
test_harness(PruningConfig::default(), store, |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let new_leaf = Hash::repeat_byte(0x01);
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf.clone()],
deactivated: smallvec![],
}),
).await;
// runtime api call fails
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
assert_eq!(relay_parent, new_leaf);
tx.send(Err(RuntimeApiError::from("oh no".to_string()))).unwrap();
}
);
// but that's fine, we're still alive
let (tx, rx) = oneshot::channel();
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
let validator_index = 5;
let query_chunk = AvailabilityStoreMessage::QueryChunk(
candidate_hash,
validator_index,
tx,
);
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
assert!(rx.await.unwrap().is_none());
});
}
#[test]
fn store_chunk_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
test_harness(PruningConfig::default(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let relay_parent = Hash::repeat_byte(32);
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
let validator_index = 5;
let chunk = ErasureChunk {
chunk: vec![1, 2, 3],
index: validator_index,
proof: vec![vec![3, 4, 5]],
};
let (tx, rx) = oneshot::channel();
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: chunk.clone(),
tx,
};
overseer_send(&mut virtual_overseer, chunk_msg.into()).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
hash,
tx,
)) => {
assert_eq!(hash, relay_parent);
tx.send(Ok(Some(4))).unwrap();
}
);
assert_eq!(rx.await.unwrap(), Ok(()));
let (tx, rx) = oneshot::channel();
let query_chunk = AvailabilityStoreMessage::QueryChunk(
candidate_hash,
validator_index,
tx,
);
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
assert_eq!(rx.await.unwrap().unwrap(), chunk);
});
}
#[test]
fn store_block_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let candidate_hash = CandidateHash(Hash::from([1; 32]));
let validator_index = 5;
let n_validators = 10;
let pov = PoV {
block_data: BlockData(vec![4, 5, 6]),
};
let available_data = AvailableData {
pov,
validation_data: test_state.persisted_validation_data,
};
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
Some(validator_index),
n_validators,
available_data.clone(),
tx,
);
virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await;
assert_eq!(rx.await.unwrap(), Ok(()));
let pov = query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap();
assert_eq!(pov, available_data);
let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap();
let chunks = erasure::obtain_chunks_v1(10, &available_data).unwrap();
let mut branches = erasure::branches(chunks.as_ref());
let branch = branches.nth(5).unwrap();
let expected_chunk = ErasureChunk {
chunk: branch.1.to_vec(),
index: 5,
proof: branch.0,
};
assert_eq!(chunk, expected_chunk);
});
}
#[test]
fn store_pov_and_query_chunk_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let candidate_hash = CandidateHash(Hash::from([1; 32]));
let n_validators = 10;
let pov = PoV {
block_data: BlockData(vec![4, 5, 6]),
};
let available_data = AvailableData {
pov,
validation_data: test_state.persisted_validation_data,
};
let no_metrics = Metrics(None);
let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap();
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
None,
n_validators,
available_data,
tx,
);
virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await;
assert_eq!(rx.await.unwrap(), Ok(()));
for validator_index in 0..n_validators {
let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap();
assert_eq!(chunk, chunks_expected[validator_index as usize]);
}
});
}
#[test]
fn stored_but_not_included_chunk_is_pruned() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
let relay_parent = Hash::repeat_byte(2);
let validator_index = 5;
let chunk = ErasureChunk {
chunk: vec![1, 2, 3],
index: validator_index,
proof: vec![vec![3, 4, 5]],
};
let (tx, rx) = oneshot::channel();
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: chunk.clone(),
tx,
};
overseer_send(&mut virtual_overseer, chunk_msg.into()).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
hash,
tx,
)) => {
assert_eq!(hash, relay_parent);
tx.send(Ok(Some(4))).unwrap();
}
);
rx.await.unwrap().unwrap();
// At this point data should be in the store.
assert_eq!(
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
chunk,
);
// Wait for twice as long as the stored block kept for.
Delay::new(test_state.pruning_config.keep_stored_block_for * 2).await;
// The block was not included by this point so it should be pruned now.
assert!(query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.is_none());
});
}
#[test]
fn stored_but_not_included_data_is_pruned() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
let n_validators = 10;
let pov = PoV {
block_data: BlockData(vec![4, 5, 6]),
};
let available_data = AvailableData {
pov,
validation_data: test_state.persisted_validation_data,
};
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
None,
n_validators,
available_data.clone(),
tx,
);
virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await;
rx.await.unwrap().unwrap();
// At this point data should be in the store.
assert_eq!(
query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(),
available_data,
);
// Wait for twice as long as the stored block kept for.
Delay::new(test_state.pruning_config.keep_stored_block_for * 2).await;
// The block was not included by this point so it should be pruned now.
assert!(query_available_data(&mut virtual_overseer, candidate_hash).await.is_none());
});
}
#[test]
fn stored_data_kept_until_finalized() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let n_validators = 10;
let pov = PoV {
block_data: BlockData(vec![4, 5, 6]),
};
let pov_hash = pov.hash();
let candidate = TestCandidateBuilder {
pov_hash,
..Default::default()
}.build();
let candidate_hash = candidate.hash();
let available_data = AvailableData {
pov,
validation_data: test_state.persisted_validation_data,
};
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
None,
n_validators,
available_data.clone(),
tx,
);
virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await;
rx.await.unwrap().unwrap();
// At this point data should be in the store.
assert_eq!(
query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(),
available_data,
);
let new_leaf = Hash::repeat_byte(2);
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf.clone()],
deactivated: smallvec![],
}),
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
assert_eq!(relay_parent, new_leaf);
tx.send(Ok(vec![
CandidateEvent::CandidateIncluded(candidate, HeadData::default()),
])).unwrap();
}
);
Delay::new(test_state.pruning_config.keep_stored_block_for * 10).await;
// At this point data should _still_ be in the store.
assert_eq!(
query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(),
available_data,
);
overseer_signal(
&mut virtual_overseer,
OverseerSignal::BlockFinalized(new_leaf)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
hash,
tx,
)) => {
assert_eq!(hash, new_leaf);
tx.send(Ok(Some(10))).unwrap();
}
);
// Wait for a half of the time finalized data should be available for
Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await;
// At this point data should _still_ be in the store.
assert_eq!(
query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(),
available_data,
);
// Wait until it is should be gone.
Delay::new(test_state.pruning_config.keep_finalized_block_for).await;
// At this point data should be gone from the store.
assert!(
query_available_data(&mut virtual_overseer, candidate_hash).await.is_none(),
);
});
}
#[test]
fn stored_chunk_kept_until_finalized() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let relay_parent = Hash::repeat_byte(2);
let validator_index = 5;
let candidate = TestCandidateBuilder {
..Default::default()
}.build();
let candidate_hash = candidate.hash();
let chunk = ErasureChunk {
chunk: vec![1, 2, 3],
index: validator_index,
proof: vec![vec![3, 4, 5]],
};
let (tx, rx) = oneshot::channel();
let chunk_msg = AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: chunk.clone(),
tx,
};
overseer_send(&mut virtual_overseer, chunk_msg.into()).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
hash,
tx,
)) => {
assert_eq!(hash, relay_parent);
tx.send(Ok(Some(4))).unwrap();
}
);
rx.await.unwrap().unwrap();
// At this point data should be in the store.
assert_eq!(
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
chunk,
);
let new_leaf = Hash::repeat_byte(2);
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf.clone()],
deactivated: smallvec![],
}),
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
assert_eq!(relay_parent, new_leaf);
tx.send(Ok(vec![
CandidateEvent::CandidateIncluded(candidate, HeadData::default()),
])).unwrap();
}
);
Delay::new(test_state.pruning_config.keep_stored_block_for * 10).await;
// At this point data should _still_ be in the store.
assert_eq!(
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
chunk,
);
overseer_signal(
&mut virtual_overseer,
OverseerSignal::BlockFinalized(new_leaf)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
hash,
tx,
)) => {
assert_eq!(hash, new_leaf);
tx.send(Ok(Some(10))).unwrap();
}
);
// Wait for a half of the time finalized data should be available for
Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await;
// At this point data should _still_ be in the store.
assert_eq!(
query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(),
chunk,
);
// Wait until it is should be gone.
Delay::new(test_state.pruning_config.keep_finalized_chunk_for).await;
// At this point data should be gone from the store.
assert!(
query_available_data(&mut virtual_overseer, candidate_hash).await.is_none(),
);
});
}
#[test]
fn forkfullness_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let n_validators = 10;
let pov_1 = PoV {
block_data: BlockData(vec![1, 2, 3]),
};
let pov_1_hash = pov_1.hash();
let pov_2 = PoV {
block_data: BlockData(vec![4, 5, 6]),
};
let pov_2_hash = pov_2.hash();
let candidate_1 = TestCandidateBuilder {
pov_hash: pov_1_hash,
..Default::default()
}.build();
let candidate_1_hash = candidate_1.hash();
let candidate_2 = TestCandidateBuilder {
pov_hash: pov_2_hash,
..Default::default()
}.build();
let candidate_2_hash = candidate_2.hash();
let available_data_1 = AvailableData {
pov: pov_1,
validation_data: test_state.persisted_validation_data.clone(),
};
let available_data_2 = AvailableData {
pov: pov_2,
validation_data: test_state.persisted_validation_data,
};
let (tx, rx) = oneshot::channel();
let msg = AvailabilityStoreMessage::StoreAvailableData(
candidate_1_hash,
None,
n_validators,
available_data_1.clone(),
tx,
);
virtual_overseer.send(FromOverseer::Communication{ msg }).await;
rx.await.unwrap().unwrap();
let (tx, rx) = oneshot::channel();
let msg = AvailabilityStoreMessage::StoreAvailableData(
candidate_2_hash,
None,
n_validators,
available_data_2.clone(),
tx,
);
virtual_overseer.send(FromOverseer::Communication{ msg }).await;
rx.await.unwrap().unwrap();
assert_eq!(
query_available_data(&mut virtual_overseer, candidate_1_hash).await.unwrap(),
available_data_1,
);
assert_eq!(
query_available_data(&mut virtual_overseer, candidate_2_hash).await.unwrap(),
available_data_2,
);
let new_leaf_1 = Hash::repeat_byte(2);
let new_leaf_2 = Hash::repeat_byte(3);
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf_1.clone(), new_leaf_2.clone()],
deactivated: smallvec![],
}),
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
leaf,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
assert_eq!(leaf, new_leaf_1);
tx.send(Ok(vec![
CandidateEvent::CandidateIncluded(candidate_1, HeadData::default()),
])).unwrap();
}
);
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
leaf,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
assert_eq!(leaf, new_leaf_2);
tx.send(Ok(vec![
CandidateEvent::CandidateIncluded(candidate_2, HeadData::default()),
])).unwrap();
}
);
overseer_signal(
&mut virtual_overseer,
OverseerSignal::BlockFinalized(new_leaf_1)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
hash,
tx,
)) => {
assert_eq!(hash, new_leaf_1);
tx.send(Ok(Some(5))).unwrap();
}
);
// Data of both candidates should be still present in the DB.
assert_eq!(
query_available_data(&mut virtual_overseer, candidate_1_hash).await.unwrap(),
available_data_1,
);
assert_eq!(
query_available_data(&mut virtual_overseer, candidate_2_hash).await.unwrap(),
available_data_2,
);
// Wait for longer than finalized blocks should be kept for
Delay::new(test_state.pruning_config.keep_finalized_block_for + Duration::from_secs(1)).await;
// Data of both candidates should be gone now.
assert!(
query_available_data(&mut virtual_overseer, candidate_1_hash).await.is_none(),
);
assert!(
query_available_data(&mut virtual_overseer, candidate_2_hash).await.is_none(),
);
});
}
async fn query_available_data(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle,
candidate_hash: CandidateHash,
) -> Option {
let (tx, rx) = oneshot::channel();
let query = AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx);
virtual_overseer.send(FromOverseer::Communication{ msg: query }).await;
rx.await.unwrap()
}
async fn query_chunk(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle,
candidate_hash: CandidateHash,
index: u32,
) -> Option {
let (tx, rx) = oneshot::channel();
let query = AvailabilityStoreMessage::QueryChunk(candidate_hash, index, tx);
virtual_overseer.send(FromOverseer::Communication{ msg: query }).await;
rx.await.unwrap()
}