Unverified Commit dfae8639 authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Whole subsystem test for new availability-distribution (#2552)



* WIP: Whole subsystem test.

* New tests compile.

* Avoid needless runtime queries for no validator nodes.

* Make tx and rx publicly accessible in virtual overseer.

This simplifies mocking in some cases, as tx can be cloned, but rx can
not.

* Whole subsystem test working.

* Update node/network/availability-distribution/src/session_cache.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Update node/network/availability-distribution/src/session_cache.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Document better what `None` return value means.

* Get rid of BitVec dependency.

* Update Cargo.lock

* Hopefully fixed implementers guide build.

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 652aad1b
Pipeline #126716 passed with stages
in 25 minutes and 33 seconds
......@@ -208,7 +208,7 @@ generate-impl-guide:
<<: *rules-test
<<: *docker-env
image:
name: michaelfbryan/mdbook-docker-image:latest
name: michaelfbryan/mdbook-docker-image:v0.4.4
entrypoint: [""]
script:
- mdbook build roadmap/implementers-guide
......
......@@ -5153,6 +5153,7 @@ version = "0.1.0"
dependencies = [
"assert_matches",
"futures 0.3.12",
"futures-timer 3.0.2",
"lru",
"maplit",
"parity-scale-codec",
......@@ -5166,6 +5167,7 @@ dependencies = [
"rand 0.8.3",
"sc-keystore",
"sc-network",
"smallvec 1.6.1",
"sp-application-crypto",
"sp-core",
"sp-keyring",
......
......@@ -29,5 +29,7 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures-timer = "3.0.2"
assert_matches = "1.4.0"
maplit = "1.0"
smallvec = "1.6.1"
......@@ -57,6 +57,10 @@ pub enum Error {
/// We tried accessing a session that was not cached.
#[error("Session is not cached.")]
NoSuchCachedSession,
/// We tried reporting bad validators, although we are not a validator ourselves.
#[error("Not a validator.")]
NotAValidator,
/// Requester stream exhausted.
#[error("Erasure chunk requester stream exhausted")]
......
......@@ -43,6 +43,9 @@ mod metrics;
/// Prometheus `Metrics` for availability distribution.
pub use metrics::Metrics;
#[cfg(test)]
mod tests;
const LOG_TARGET: &'static str = "availability_distribution";
/// The availability distribution subsystem.
......
......@@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::sync::Arc;
use parity_scale_codec::Encode;
......@@ -23,15 +23,15 @@ use futures::channel::{mpsc, oneshot};
use futures::{executor, Future, FutureExt, StreamExt, select};
use futures::task::{Poll, Context, noop_waker};
use polkadot_erasure_coding::{obtain_chunks_v1 as obtain_chunks, branches};
use sc_network as network;
use sp_keyring::Sr25519Keyring;
use polkadot_primitives::v1::{AvailableData, BlockData, CandidateHash, HeadData, PersistedValidationData, PoV, ValidatorIndex};
use polkadot_primitives::v1::{BlockData, CandidateHash, PoV, ValidatorIndex};
use polkadot_node_network_protocol::request_response::v1;
use polkadot_subsystem::messages::AllMessages;
use crate::metrics::Metrics;
use crate::tests::mock::get_valid_chunk_data;
use super::*;
#[test]
......@@ -74,7 +74,10 @@ fn task_does_not_accept_invalid_chunk() {
#[test]
fn task_stores_valid_chunk() {
let (mut task, rx) = get_test_running_task();
let (root_hash, chunk) = get_valid_chunk_data();
let pov = PoV {
block_data: BlockData(vec![45, 46, 47]),
};
let (root_hash, chunk) = get_valid_chunk_data(pov);
task.erasure_root = root_hash;
task.request.index = chunk.index;
......@@ -107,7 +110,10 @@ fn task_stores_valid_chunk() {
#[test]
fn task_does_not_accept_wrongly_indexed_chunk() {
let (mut task, rx) = get_test_running_task();
let (root_hash, chunk) = get_valid_chunk_data();
let pov = PoV {
block_data: BlockData(vec![45, 46, 47]),
};
let (root_hash, chunk) = get_valid_chunk_data(pov);
task.erasure_root = root_hash;
task.request.index = ValidatorIndex(chunk.index.0+1);
......@@ -137,7 +143,10 @@ fn task_does_not_accept_wrongly_indexed_chunk() {
#[test]
fn task_stores_valid_chunk_if_there_is_one() {
let (mut task, rx) = get_test_running_task();
let (root_hash, chunk) = get_valid_chunk_data();
let pov = PoV {
block_data: BlockData(vec![45, 46, 47]),
};
let (root_hash, chunk) = get_valid_chunk_data(pov);
task.erasure_root = root_hash;
task.request.index = chunk.index;
......@@ -287,29 +296,3 @@ fn get_test_running_task() -> (RunningTask, mpsc::Receiver<FromFetchTask>) {
)
}
fn get_valid_chunk_data() -> (Hash, ErasureChunk) {
let fake_validator_count = 10;
let persisted = PersistedValidationData {
parent_head: HeadData(vec![7, 8, 9]),
relay_parent_number: Default::default(),
max_pov_size: 1024,
relay_parent_storage_root: Default::default(),
};
let pov_block = PoV {
block_data: BlockData(vec![45, 46, 47]),
};
let available_data = AvailableData {
validation_data: persisted, pov: Arc::new(pov_block),
};
let chunks = obtain_chunks(fake_validator_count, &available_data).unwrap();
let branches = branches(chunks.as_ref());
let root = branches.root();
let chunk = branches.enumerate()
.map(|(index, (proof, chunk))| ErasureChunk {
chunk: chunk.to_vec(),
index: ValidatorIndex(index as _),
proof,
})
.next().expect("There really should be 10 chunks.");
(root, chunk)
}
......@@ -54,7 +54,10 @@ pub struct SessionCache {
/// to get any existing cache entry, before fetching new information, as we should not mess up
/// the order of validators in `SessionInfo::validator_groups`. (We want live TCP connections
/// wherever possible.)
session_info_cache: LruCache<SessionIndex, SessionInfo>,
///
/// We store `None` in case we are not a validator, so we won't do needless fetches for non
/// validator nodes.
session_info_cache: LruCache<SessionIndex, Option<SessionInfo>>,
/// Key store for determining whether we are a validator and what `ValidatorIndex` we have.
keystore: SyncCryptoStorePtr,
......@@ -134,19 +137,31 @@ impl SessionCache {
}
};
if let Some(info) = self.session_info_cache.get(&session_index) {
return Ok(Some(with_info(info)));
if let Some(o_info) = self.session_info_cache.get(&session_index) {
tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
if let Some(info) = o_info {
return Ok(Some(with_info(info)));
} else {
// Info was cached - we are not a validator: return early:
return Ok(None)
}
}
if let Some(info) = self
.query_info_from_runtime(ctx, parent, session_index)
.await?
{
tracing::trace!(target: LOG_TARGET, session_index, "Calling `with_info`");
let r = with_info(&info);
self.session_info_cache.put(session_index, info);
return Ok(Some(r));
tracing::trace!(target: LOG_TARGET, session_index, "Storing session info in lru!");
self.session_info_cache.put(session_index, Some(info));
Ok(Some(r))
} else {
// Avoid needless fetches if we are not a validator:
self.session_info_cache.put(session_index, None);
tracing::trace!(target: LOG_TARGET, session_index, "No session info found!");
Ok(None)
}
Ok(None)
}
/// Variant of `report_bad` that never fails, but just logs errors.
......@@ -172,7 +187,9 @@ impl SessionCache {
let session = self
.session_info_cache
.get_mut(&report.session_index)
.ok_or(Error::NoSuchCachedSession)?;
.ok_or(Error::NoSuchCachedSession)?
.as_mut()
.ok_or(Error::NotAValidator)?;
let group = session
.validator_groups
.get_mut(report.group_index.0 as usize)
......@@ -194,6 +211,8 @@ impl SessionCache {
/// We need to pass in the relay parent for our call to `request_session_info_ctx`. We should
/// actually don't need that: I suppose it is used for internal caching based on relay parents,
/// which we don't use here. It should not do any harm though.
///
/// Returns: `None` if not a validator.
async fn query_info_from_runtime<Context>(
&self,
ctx: &mut Context,
......
This diff is collapsed.
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Helper functions and tools to generate mock data useful for testing this subsystem.
use std::sync::Arc;
use sp_keyring::Sr25519Keyring;
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_primitives::v1::{AvailableData, BlockData, CandidateCommitments, CandidateDescriptor,
CandidateHash, CommittedCandidateReceipt, ErasureChunk, GroupIndex, Hash, HeadData, Id
as ParaId, OccupiedCore, PersistedValidationData, PoV, SessionInfo,
ValidatorIndex
};
/// Create dummy session info with two validator groups.
pub fn make_session_info() -> SessionInfo {
let validators = vec![
Sr25519Keyring::Ferdie, // <- this node, role: validator
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Eve,
Sr25519Keyring::One,
];
let validator_groups: Vec<Vec<ValidatorIndex>> = [vec![5, 0, 3], vec![1, 6, 2, 4]]
.iter().map(|g| g.into_iter().map(|v| ValidatorIndex(*v)).collect()).collect();
SessionInfo {
discovery_keys: validators.iter().map(|k| k.public().into()).collect(),
// Not used:
n_cores: validator_groups.len() as u32,
validator_groups,
// Not used values:
validators: validators.iter().map(|k| k.public().into()).collect(),
assignment_keys: Vec::new(),
zeroth_delay_tranche_width: 0,
relay_vrf_modulo_samples: 0,
n_delay_tranches: 0,
no_show_slots: 0,
needed_approvals: 0,
}
}
/// Builder for constructing occupied cores.
///
/// Takes all the values we care about and fills the rest with dummy values on `build`.
pub struct OccupiedCoreBuilder {
pub group_responsible: GroupIndex,
pub para_id: ParaId,
pub relay_parent: Hash,
}
impl OccupiedCoreBuilder {
pub fn build(self) -> (OccupiedCore, (CandidateHash, ErasureChunk)) {
let pov = PoV {
block_data: BlockData(vec![45, 46, 47]),
};
let pov_hash = pov.hash();
let (erasure_root, chunk) = get_valid_chunk_data(pov.clone());
let candidate_receipt = TestCandidateBuilder {
para_id: self.para_id,
pov_hash,
relay_parent: self.relay_parent,
erasure_root,
..Default::default()
}.build();
let core = OccupiedCore {
next_up_on_available: None,
occupied_since: 0,
time_out_at: 0,
next_up_on_time_out: None,
availability: Default::default(),
group_responsible: self.group_responsible,
candidate_hash: candidate_receipt.hash(),
candidate_descriptor: candidate_receipt.descriptor().clone(),
};
(core, (candidate_receipt.hash(), chunk))
}
}
#[derive(Default)]
pub struct TestCandidateBuilder {
para_id: ParaId,
head_data: HeadData,
pov_hash: Hash,
relay_parent: Hash,
erasure_root: Hash,
}
impl TestCandidateBuilder {
pub fn build(self) -> CommittedCandidateReceipt {
CommittedCandidateReceipt {
descriptor: CandidateDescriptor {
para_id: self.para_id,
pov_hash: self.pov_hash,
relay_parent: self.relay_parent,
erasure_root: self.erasure_root,
..Default::default()
},
commitments: CandidateCommitments {
head_data: self.head_data,
..Default::default()
},
}
}
}
pub fn get_valid_chunk_data(pov: PoV) -> (Hash, ErasureChunk) {
let fake_validator_count = 10;
let persisted = PersistedValidationData {
parent_head: HeadData(vec![7, 8, 9]),
relay_parent_number: Default::default(),
max_pov_size: 1024,
relay_parent_storage_root: Default::default(),
};
let available_data = AvailableData {
validation_data: persisted, pov: Arc::new(pov),
};
let chunks = obtain_chunks(fake_validator_count, &available_data).unwrap();
let branches = branches(chunks.as_ref());
let root = branches.root();
let chunk = branches.enumerate()
.map(|(index, (proof, chunk))| ErasureChunk {
chunk: chunk.to_vec(),
index: ValidatorIndex(index as _),
proof,
})
.next().expect("There really should be 10 chunks.");
(root, chunk)
}
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use futures::{executor, future, Future};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_subsystem_testhelpers as test_helpers;
use super::*;
mod state;
/// State for test harnesses.
use state::{TestState, TestHarness};
/// Mock data useful for testing.
pub(crate) mod mock;
fn test_harness<T: Future<Output = ()>>(
keystore: SyncCryptoStorePtr,
test_fx: impl FnOnce(TestHarness) -> T,
) {
sp_tracing::try_init_simple();
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default());
{
let subsystem = subsystem.run(context);
let test_fut = test_fx(TestHarness { virtual_overseer, pool });
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::select(test_fut, subsystem));
}
}
/// Simple basic check, whether the subsystem works as expected.
///
/// Exceptional cases are tested as unit tests in `fetch_task`.
#[test]
fn check_basic() {
let state = TestState::default();
test_harness(state.keystore.clone(), move |harness| {
state.run(harness)
});
}
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers::TestSubsystemContextHandle;
use smallvec::smallvec;
use futures::{FutureExt, channel::oneshot, SinkExt, channel::mpsc, StreamExt};
use futures_timer::Delay;
use sc_keystore::LocalKeystore;
use sp_application_crypto::AppKey;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_keyring::Sr25519Keyring;
use sp_core::{traits::SpawnNamed, testing::TaskExecutor};
use sc_network as network;
use sc_network::config as netconfig;
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal, messages::{AllMessages,
AvailabilityDistributionMessage, AvailabilityStoreMessage, NetworkBridgeMessage, RuntimeApiMessage,
RuntimeApiRequest}
};
use polkadot_primitives::v1::{CandidateHash, CoreState, ErasureChunk, GroupIndex, Hash, Id
as ParaId, ScheduledCore, SessionInfo, ValidatorId,
ValidatorIndex
};
use polkadot_node_network_protocol::{jaeger,
request_response::{IncomingRequest, OutgoingRequest, Requests, v1}
};
use polkadot_subsystem_testhelpers as test_helpers;
use test_helpers::SingleItemSink;
use super::mock::{make_session_info, OccupiedCoreBuilder, };
use crate::LOG_TARGET;
pub struct TestHarness {
pub virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
pub pool: TaskExecutor,
}
/// TestState for mocking execution of this subsystem.
///
/// The `Default` instance provides data, which makes the system succeed by providing a couple of
/// valid occupied cores. You can tune the data before calling `TestState::run`. E.g. modify some
/// chunks to be invalid, the test will then still pass if you remove that chunk from
/// `valid_chunks`.
#[derive(Clone)]
pub struct TestState {
// Simulated relay chain heads:
pub relay_chain: Vec<Hash>,
pub chunks: HashMap<(CandidateHash, ValidatorIndex), ErasureChunk>,
/// All chunks that are valid and should be accepted.
pub valid_chunks: HashSet<(CandidateHash, ValidatorIndex)>,
pub session_info: SessionInfo,
/// Cores per relay chain block.
pub cores: HashMap<Hash, Vec<CoreState>>,
pub keystore: SyncCryptoStorePtr,
}
impl Default for TestState {
fn default() -> Self {
let relay_chain: Vec<_> = (1u8..10).map(Hash::repeat_byte).collect();
let chain_a = ParaId::from(1);
let chain_b = ParaId::from(2);
let chain_ids = vec![chain_a, chain_b];
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let session_info = make_session_info();
SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Ferdie.to_seed()),
)
.expect("Insert key into keystore");
let (cores, chunks) = {
let mut cores = HashMap::new();
let mut chunks = HashMap::new();
cores.insert(relay_chain[0],
vec![
CoreState::Scheduled(ScheduledCore {
para_id: chain_ids[0],
collator: None,
}),
CoreState::Scheduled(ScheduledCore {
para_id: chain_ids[1],
collator: None,
}),
]
);
let heads = {
let mut advanced = relay_chain.iter();
advanced.next();
relay_chain.iter().zip(advanced)
};
for (relay_parent, relay_child) in heads {
let (p_cores, p_chunks): (Vec<_>, Vec<_>) = chain_ids.iter().enumerate()
.map(|(i, para_id)| {
let (core, chunk) = OccupiedCoreBuilder {
group_responsible: GroupIndex(i as _),
para_id: *para_id,
relay_parent: relay_parent.clone(),
}.build();
(CoreState::Occupied(core), chunk)
}
)
.unzip();
cores.insert(relay_child.clone(), p_cores);
// Skip chunks for our own group (won't get fetched):
let mut chunks_other_groups = p_chunks.into_iter();
chunks_other_groups.next();
for (validator_index, chunk) in chunks_other_groups {
chunks.insert((validator_index, chunk.index), chunk);
}
}
(cores, chunks)
};
Self {
relay_chain,
valid_chunks: chunks.clone().keys().map(Clone::clone).collect(),
chunks,
session_info,
cores,
keystore,
}
}
}
impl TestState {
/// Run, but fail after some timeout.
pub async fn run(self, harness: TestHarness) {
// Make sure test won't run forever.
let f = self.run_inner(harness.pool, harness.virtual_overseer).timeout(Duration::from_secs(10));
assert!(f.await.is_some(), "Test ran into timeout");