Unverified Commit c5df4fa3 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

tests: use future::join instead of future::select (#2813)

* tests/av-store: use future::join instead of future::select

* tests/backing: use future::join instead of future::select

* tests/provisioner: use future::join instead of future::select

* tests/av-dist: use future::join instead of future::select

* tests/av-recovery: use future::join instead of future::select

* tests/bridge: use future::join instead of future::select

* tests/collator-protocol: use future::join instead of future::select

* tests/stmt-dist: use future::join instead of future::select

* fix tests
parent fadde846
Pipeline #132774 failed with stages
in 28 minutes and 31 seconds
......@@ -49,9 +49,7 @@ const TEST_CONFIG: Config = Config {
col_meta: columns::META,
};
struct TestHarness {
virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
}
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>;
#[derive(Default)]
struct TestCandidateBuilder {
......@@ -140,10 +138,10 @@ impl Default for TestState {
}
fn test_harness<T: Future<Output=()>>(
fn test_harness<T: Future<Output=VirtualOverseer>>(
state: TestState,
store: Arc<dyn KeyValueDB>,
test: impl FnOnce(TestHarness) -> T,
test: impl FnOnce(VirtualOverseer) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
......@@ -170,20 +168,24 @@ fn test_harness<T: Future<Output=()>>(
let subsystem = run(subsystem, context);
let test_fut = test(TestHarness {
virtual_overseer,
});
let test_fut = test(virtual_overseer);
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::select(test_fut, subsystem));
executor::block_on(future::join(async move {
let mut overseer = test_fut.await;
overseer_signal(
&mut overseer,
OverseerSignal::Conclude,
).await;
}, subsystem));
}
const TIMEOUT: Duration = Duration::from_millis(100);
async fn overseer_send(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
overseer: &mut VirtualOverseer,
msg: AvailabilityStoreMessage,
) {
tracing::trace!(meg = ?msg, "sending message");
......@@ -195,7 +197,7 @@ async fn overseer_send(
}
async fn overseer_recv(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
overseer: &mut VirtualOverseer,
) -> AllMessages {
let msg = overseer_recv_with_timeout(overseer, TIMEOUT)
.await
......@@ -207,7 +209,7 @@ async fn overseer_recv(
}
async fn overseer_recv_with_timeout(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
overseer: &mut VirtualOverseer,
timeout: Duration,
) -> Option<AllMessages> {
tracing::trace!("waiting for message...");
......@@ -218,7 +220,7 @@ async fn overseer_recv_with_timeout(
}
async fn overseer_signal(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
overseer: &mut VirtualOverseer,
signal: OverseerSignal,
) {
overseer
......@@ -247,8 +249,7 @@ fn candidate_included(receipt: CandidateReceipt) -> CandidateEvent {
fn runtime_api_error_does_not_stop_the_subsystem() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
test_harness(TestState::default(), store, |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(TestState::default(), store, |mut virtual_overseer| async move {
let new_leaf = Hash::repeat_byte(0x01);
overseer_signal(
......@@ -288,15 +289,14 @@ fn runtime_api_error_does_not_stop_the_subsystem() {
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
assert!(rx.await.unwrap().is_none());
virtual_overseer
});
}
#[test]
fn store_chunk_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
test_harness(TestState::default(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move {
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
let validator_index = ValidatorIndex(5);
let n_validators = 10;
......@@ -338,6 +338,7 @@ fn store_chunk_works() {
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
assert_eq!(rx.await.unwrap().unwrap(), chunk);
virtual_overseer
});
}
......@@ -345,8 +346,7 @@ fn store_chunk_works() {
#[test]
fn store_chunk_does_nothing_if_no_entry_already() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
test_harness(TestState::default(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move {
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
let validator_index = ValidatorIndex(5);
......@@ -377,14 +377,14 @@ fn store_chunk_does_nothing_if_no_entry_already() {
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
assert!(rx.await.unwrap().is_none());
virtual_overseer
});
}
#[test]
fn query_chunk_checks_meta() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
test_harness(TestState::default(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move {
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
let validator_index = ValidatorIndex(5);
let n_validators = 10;
......@@ -422,6 +422,7 @@ fn query_chunk_checks_meta() {
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
assert!(!rx.await.unwrap());
virtual_overseer
});
}
......@@ -429,8 +430,7 @@ fn query_chunk_checks_meta() {
fn store_block_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
let validator_index = ValidatorIndex(5);
let n_validators = 10;
......@@ -474,6 +474,7 @@ fn store_block_works() {
};
assert_eq!(chunk, expected_chunk);
virtual_overseer
});
}
......@@ -482,8 +483,7 @@ fn store_pov_and_query_chunk_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
let n_validators = 10;
......@@ -516,6 +516,7 @@ fn store_pov_and_query_chunk_works() {
assert_eq!(chunk.chunk, chunks_expected[i as usize]);
}
virtual_overseer
});
}
......@@ -524,9 +525,7 @@ fn query_all_chunks_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
// all chunks for hash 1.
// 1 chunk for hash 2.
// 0 chunks for hash 3.
......@@ -608,6 +607,7 @@ fn query_all_chunks_works() {
virtual_overseer.send(FromOverseer::Communication { msg }).await;
assert_eq!(rx.await.unwrap().len(), 0);
}
virtual_overseer
});
}
......@@ -616,8 +616,7 @@ fn stored_but_not_included_data_is_pruned() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
let candidate_hash = CandidateHash(Hash::repeat_byte(1));
let n_validators = 10;
......@@ -655,6 +654,7 @@ fn stored_but_not_included_data_is_pruned() {
// The block was not included by this point so it should be pruned now.
assert!(query_available_data(&mut virtual_overseer, candidate_hash).await.is_none());
virtual_overseer
});
}
......@@ -663,8 +663,7 @@ fn stored_data_kept_until_finalized() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
let n_validators = 10;
let pov = PoV {
......@@ -760,6 +759,7 @@ fn stored_data_kept_until_finalized() {
assert!(
has_all_chunks(&mut virtual_overseer, candidate_hash, n_validators, false).await
);
virtual_overseer
});
}
......@@ -768,8 +768,7 @@ fn forkfullness_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
let test_state = TestState::default();
test_harness(test_state.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
let n_validators = 10;
let block_number_1 = 5;
let block_number_2 = 5;
......@@ -930,11 +929,12 @@ fn forkfullness_works() {
assert!(
has_all_chunks(&mut virtual_overseer, candidate_2_hash, n_validators, false).await,
);
virtual_overseer
});
}
async fn query_available_data(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
virtual_overseer: &mut VirtualOverseer,
candidate_hash: CandidateHash,
) -> Option<AvailableData> {
let (tx, rx) = oneshot::channel();
......@@ -946,7 +946,7 @@ async fn query_available_data(
}
async fn query_chunk(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
virtual_overseer: &mut VirtualOverseer,
candidate_hash: CandidateHash,
index: ValidatorIndex,
) -> Option<ErasureChunk> {
......@@ -959,7 +959,7 @@ async fn query_chunk(
}
async fn has_all_chunks(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
virtual_overseer: &mut VirtualOverseer,
candidate_hash: CandidateHash,
n_validators: u32,
expect_present: bool,
......@@ -973,7 +973,7 @@ async fn has_all_chunks(
}
async fn import_leaf(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
virtual_overseer: &mut VirtualOverseer,
parent_hash: Hash,
block_number: BlockNumber,
events: Vec<CandidateEvent>,
......
......@@ -1353,6 +1353,7 @@ mod tests {
ActiveLeavesUpdate, FromOverseer, OverseerSignal, ActivatedLeaf,
};
use polkadot_node_primitives::{InvalidCandidate, BlockData};
use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_keyring::Sr25519Keyring;
use sp_application_crypto::AppKey;
use sp_keystore::{CryptoStore, SyncCryptoStore};
......@@ -1466,14 +1467,16 @@ mod tests {
}
}
struct TestHarness {
virtual_overseer: polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle<CandidateBackingMessage>,
}
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<CandidateBackingMessage>;
fn test_harness<T: Future<Output=()>>(keystore: SyncCryptoStorePtr, test: impl FnOnce(TestHarness) -> T) {
fn test_harness<T: Future<Output=VirtualOverseer>>(
keystore: SyncCryptoStorePtr,
test: impl FnOnce(VirtualOverseer) -> T,
) {
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone());
let (context, virtual_overseer) =
test_helpers::make_subsystem_context(pool.clone());
let subsystem = CandidateBackingSubsystem::new(
pool.clone(),
......@@ -1481,13 +1484,16 @@ mod tests {
Metrics(None),
).run(context);
let test_fut = test(TestHarness {
virtual_overseer,
});
let test_fut = test(virtual_overseer);
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
futures::executor::block_on(future::select(test_fut, subsystem));
futures::executor::block_on(future::join(async move {
let mut virtual_overseer = test_fut.await;
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::Conclude,
)).await;
}, subsystem));
}
fn make_erasure_root(test: &TestState, pov: PoV) -> Hash {
......@@ -1529,7 +1535,7 @@ mod tests {
// Tests that the subsystem performs actions that are requied on startup.
async fn test_startup(
virtual_overseer: &mut polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle<CandidateBackingMessage>,
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
) {
// Start work on some new parent.
......@@ -1587,9 +1593,7 @@ mod tests {
#[test]
fn backing_second_works() {
let test_state = TestState::default();
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
......@@ -1674,6 +1678,7 @@ mod tests {
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
virtual_overseer
});
}
......@@ -1681,9 +1686,7 @@ mod tests {
#[test]
fn backing_works() {
let test_state = TestState::default();
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
......@@ -1820,15 +1823,14 @@ mod tests {
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
virtual_overseer
});
}
#[test]
fn backing_works_while_validation_ongoing() {
let test_state = TestState::default();
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
......@@ -1983,6 +1985,7 @@ mod tests {
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
virtual_overseer
});
}
......@@ -1991,9 +1994,7 @@ mod tests {
#[test]
fn backing_misbehavior_works() {
let test_state = TestState::default();
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
......@@ -2137,6 +2138,7 @@ mod tests {
).expect("signature must be valid");
}
);
virtual_overseer
});
}
......@@ -2145,9 +2147,7 @@ mod tests {
#[test]
fn backing_dont_second_invalid() {
let test_state = TestState::default();
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
let pov_block_a = PoV {
......@@ -2268,6 +2268,7 @@ mod tests {
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
virtual_overseer
});
}
......@@ -2276,9 +2277,7 @@ mod tests {
#[test]
fn backing_second_after_first_fails_works() {
let test_state = TestState::default();
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
......@@ -2392,6 +2391,7 @@ mod tests {
assert_eq!(&*pov, &pov_to_second);
}
);
virtual_overseer
});
}
......@@ -2400,9 +2400,7 @@ mod tests {
#[test]
fn backing_works_after_failed_validation() {
let test_state = TestState::default();
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
......@@ -2478,6 +2476,7 @@ mod tests {
virtual_overseer.send(FromOverseer::Communication{ msg }).await;
assert_eq!(rx.await.unwrap().len(), 0);
virtual_overseer
});
}
......@@ -2491,9 +2490,7 @@ mod tests {
collator: Some(Sr25519Keyring::Bob.public().into()),
});
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
......@@ -2531,6 +2528,7 @@ mod tests {
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
virtual_overseer
});
}
......@@ -2542,9 +2540,7 @@ mod tests {
collator: Some(Sr25519Keyring::Bob.public().into()),
});
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
......@@ -2587,6 +2583,7 @@ mod tests {
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
virtual_overseer
});
}
......@@ -2672,9 +2669,7 @@ mod tests {
fn retry_works() {
// sp_tracing::try_init_simple();
let test_state = TestState::default();
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
let pov = PoV {
......@@ -2817,6 +2812,7 @@ mod tests {
)
) if pov == pov && &c == candidate.descriptor()
);
virtual_overseer
});
}
}
......@@ -219,7 +219,7 @@ mod select_candidates {
futures::pin_mut!(overseer, test);
let _ = futures::executor::block_on(future::select(overseer, test));
let _ = futures::executor::block_on(future::join(overseer, test));
}
// For test purposes, we always return this set of availability cores:
......
......@@ -27,7 +27,7 @@ use super::*;
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<ApprovalDistributionMessage>;
fn test_harness<T: Future<Output = ()>>(
fn test_harness<T: Future<Output = VirtualOverseer>>(
mut state: State,
test_fn: impl FnOnce(VirtualOverseer) -> T,
) -> State {
......@@ -51,7 +51,14 @@ fn test_harness<T: Future<Output = ()>>(
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::select(test_fut, subsystem));
executor::block_on(future::join(async move {
let mut overseer = test_fut.await;
overseer
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.timeout(TIMEOUT)
.await
.expect("Conclude send timeout");
}, subsystem));
}
state
......@@ -262,6 +269,7 @@ fn try_import_the_same_assignment() {
.is_none(),
"no message should be sent",
);
virtual_overseer
});
}
......@@ -342,6 +350,7 @@ fn spam_attack_results_in_negative_reputation_change() {
expect_reputation_change(overseer, peer, COST_UNEXPECTED_MESSAGE).await;
expect_reputation_change(overseer, peer, BENEFIT_VALID_MESSAGE).await;
}
virtual_overseer
});
}
......@@ -423,6 +432,7 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() {
// now we should
expect_reputation_change(overseer, peer, COST_DUPLICATE_MESSAGE).await;
virtual_overseer
});
}
......@@ -509,6 +519,7 @@ fn import_approval_happy_path() {
assert_eq!(approvals.len(), 1);
}
);
virtual_overseer
});
}
......@@ -588,6 +599,7 @@ fn import_approval_bad() {
);
expect_reputation_change(overseer, &peer_b, COST_INVALID_MESSAGE).await;
virtual_overseer
});
}
......@@ -626,6 +638,7 @@ fn update_our_view() {
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]);
overseer_send(overseer, msg).await;
virtual_overseer
});
assert!(state.blocks_by_number.get(&1).is_some());
......@@ -639,6 +652,7 @@ fn update_our_view() {
let overseer = &mut virtual_overseer;
// finalize a block
overseer_signal_block_finalized(overseer, 2).await;
virtual_overseer
});
assert!(state.blocks_by_number.get(&1).is_none());
......@@ -652,6 +666,7 @@ fn update_our_view() {
let overseer = &mut virtual_overseer;
// finalize a very high block
overseer_signal_block_finalized(overseer, 4_000_000_000).await;
virtual_overseer
});
assert!(state.blocks_by_number.get(&3).is_none());
......@@ -726,6 +741,7 @@ fn update_peer_view() {
assert_eq!(assignments.len(), 1);
}
);
virtual_overseer
});