Unverified Commit 8d894185 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

tabify tests (#3220)

* tabify tests

* move mod tests; up
parent 0795cb57
Pipeline #141916 passed with stages
in 42 minutes and 41 seconds
......@@ -49,6 +49,9 @@ use std::sync::Arc;
mod error;
#[cfg(test)]
mod tests;
const LOG_TARGET: &'static str = "parachain::collation-generation";
/// Collation Generation Subsystem
......@@ -506,6 +509,3 @@ impl metrics::Metrics for Metrics {
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests;
This diff is collapsed.
......@@ -43,6 +43,9 @@ use std::{pin::Pin, collections::BTreeMap, sync::Arc};
use thiserror::Error;
use futures_timer::Delay;
#[cfg(test)]
mod tests;
/// How long to wait before proposing.
const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(2000);
......@@ -599,6 +602,3 @@ impl metrics::Metrics for Metrics {
/// The provisioning subsystem.
pub type ProvisioningSubsystem<Spawner> = JobSubsystem<ProvisioningJob, Spawner>;
#[cfg(test)]
mod tests;
......@@ -44,6 +44,9 @@ use cache::{RequestResult, RequestResultCache};
mod cache;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "parachain::runtime-api";
/// The number of maximum runtime api requests can be executed in parallel. Further requests will be buffered.
......@@ -411,6 +414,3 @@ impl metrics::Metrics for Metrics {
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests;
......@@ -20,10 +20,6 @@
#![warn(missing_docs)]
#[cfg(test)]
mod tests;
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
use futures::{channel::oneshot, FutureExt as _};
use polkadot_primitives::v1::{
......@@ -47,6 +43,9 @@ use polkadot_node_network_protocol::{
PeerId, View, v1 as protocol_v1, UnifiedReputationChange as Rep,
};
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "parachain::approval-distribution";
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("Peer sent an out-of-view assignment or approval");
......
......@@ -51,6 +51,7 @@ use polkadot_node_network_protocol::{
};
use polkadot_node_subsystem_util::request_session_info;
use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1};
mod error;
#[cfg(test)]
......
......@@ -64,6 +64,8 @@ use network::{Network, send_message};
mod multiplexer;
pub use multiplexer::RequestMultiplexer;
#[cfg(test)]
mod tests;
/// The maximum amount of heads a peer is allowed to have in their view at any time.
///
......@@ -1131,9 +1133,3 @@ async fn dispatch_collation_events_to_all<I>(
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
}
#[cfg(test)]
mod tests;
This diff is collapsed.
......@@ -44,6 +44,9 @@ use polkadot_node_primitives::{SignedFullStatement, Statement, PoV};
use crate::error::{Fatal, NonFatal, log_error};
use super::{LOG_TARGET, Result};
#[cfg(test)]
mod tests;
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
#[derive(Clone, Default)]
......@@ -866,6 +869,3 @@ pub(crate) async fn run(
}
}
}
#[cfg(test)]
mod tests;
......@@ -50,6 +50,9 @@ use polkadot_subsystem::{
use super::{modify_reputation, Result, LOG_TARGET};
#[cfg(test)]
mod tests;
const COLLATION_FETCH_TIMEOUT: Duration = Duration::from_secs(2);
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
......@@ -1244,6 +1247,3 @@ where
false
}
}
#[cfg(test)]
mod tests;
......@@ -18,9 +18,6 @@
//! and issuing a connection request to the validators relevant to
//! the gossiping subsystems on every new session.
#[cfg(test)]
mod tests;
use std::time::{Duration, Instant};
use futures::{channel::oneshot, FutureExt as _};
use polkadot_node_subsystem::{
......@@ -38,6 +35,9 @@ use polkadot_node_network_protocol::peer_set::PeerSet;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use sp_application_crypto::{Public, AppKey};
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "parachain::gossip-support";
// How much time should we wait since the last
// authority discovery resolution failure.
......
......@@ -71,6 +71,9 @@ use requester::{RequesterMessage, fetch};
mod responder;
use responder::{ResponderMessage, respond};
#[cfg(test)]
mod tests;
const COST_UNEXPECTED_STATEMENT: Rep = Rep::CostMinor("Unexpected Statement");
const COST_FETCH_FAIL: Rep = Rep::CostMinor("Requesting `CommittedCandidateReceipt` from peer failed");
const COST_INVALID_SIGNATURE: Rep = Rep::CostMajor("Invalid Statement Signature");
......@@ -2044,6 +2047,3 @@ impl metrics::Metrics for Metrics {
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests;
......@@ -98,6 +98,9 @@ use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, mete
use polkadot_node_primitives::SpawnNamed;
use polkadot_procmacro_overseer_subsystems_gen::AllSubsystemsGen;
#[cfg(test)]
mod tests;
// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
// The capacity of signal channels to subsystems.
......@@ -2196,6 +2199,3 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
instance,
})
}
#[cfg(test)]
mod tests;
This diff is collapsed.
......@@ -54,8 +54,6 @@ use thiserror::Error;
pub use metered_channel as metered;
pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
mod error_handling;
/// Error classification.
pub use error_handling::{Fault, unwrap_non_fatal};
......@@ -72,6 +70,11 @@ pub mod reexports {
/// Convenient and efficient runtime info access.
pub mod runtime;
mod error_handling;
#[cfg(test)]
mod tests;
/// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs
......@@ -859,6 +862,3 @@ impl futures::Stream for Metronome
Poll::Pending
}
}
#[cfg(test)]
mod tests;
......@@ -19,8 +19,8 @@ use executor::block_on;
use thiserror::Error;
use polkadot_node_jaeger as jaeger;
use polkadot_node_subsystem::{
messages::{AllMessages, CollatorProtocolMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
SpawnedSubsystem, ActivatedLeaf, LeafStatus,
messages::{AllMessages, CollatorProtocolMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
SpawnedSubsystem, ActivatedLeaf, LeafStatus,
};
use assert_matches::assert_matches;
use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt};
......@@ -37,7 +37,7 @@ use std::{pin::Pin, sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration
// job structs are constructed within JobTrait::run
// most will want to retain the sender and receiver, as well as whatever other data they like
struct FakeCollatorProtocolJob {
receiver: mpsc::Receiver<CollatorProtocolMessage>,
receiver: mpsc::Receiver<CollatorProtocolMessage>,
}
// Error will mostly be a wrapper to make the try operator more convenient;
......@@ -45,215 +45,215 @@ struct FakeCollatorProtocolJob {
// It must implement Debug for logging.
#[derive(Debug, Error)]
enum Error {
#[error(transparent)]
Sending(#[from]mpsc::SendError),
#[error(transparent)]
Sending(#[from]mpsc::SendError),
}
impl JobTrait for FakeCollatorProtocolJob {
type ToJob = CollatorProtocolMessage;
type Error = Error;
type RunArgs = bool;
type Metrics = ();
const NAME: &'static str = "FakeCollatorProtocolJob";
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
fn run<S: SubsystemSender>(
_: Hash,
_: Arc<jaeger::Span>,
run_args: Self::RunArgs,
_metrics: Self::Metrics,
receiver: mpsc::Receiver<CollatorProtocolMessage>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = FakeCollatorProtocolJob { receiver };
if run_args {
sender.send_message(CollatorProtocolMessage::Invalid(
Default::default(),
Default::default(),
).into()).await;
}
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
job.run_loop().await
}
.boxed()
}
type ToJob = CollatorProtocolMessage;
type Error = Error;
type RunArgs = bool;
type Metrics = ();
const NAME: &'static str = "FakeCollatorProtocolJob";
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
fn run<S: SubsystemSender>(
_: Hash,
_: Arc<jaeger::Span>,
run_args: Self::RunArgs,
_metrics: Self::Metrics,
receiver: mpsc::Receiver<CollatorProtocolMessage>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = FakeCollatorProtocolJob { receiver };
if run_args {
sender.send_message(CollatorProtocolMessage::Invalid(
Default::default(),
Default::default(),
).into()).await;
}
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
job.run_loop().await
}
.boxed()
}
}
impl FakeCollatorProtocolJob {
async fn run_loop(mut self) -> Result<(), Error> {
loop {
match self.receiver.next().await {
Some(_csm) => {
unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
}
None => break,
}
}
Ok(())
}
async fn run_loop(mut self) -> Result<(), Error> {
loop {
match self.receiver.next().await {
Some(_csm) => {
unimplemented!("we'd report the collator to the peer set manager here, but that's not implemented yet");
}
None => break,
}
}
Ok(())
}
}
// with the job defined, it's straightforward to get a subsystem implementation.
type FakeCollatorProtocolSubsystem<Spawner> =
JobSubsystem<FakeCollatorProtocolJob, Spawner>;
JobSubsystem<FakeCollatorProtocolJob, Spawner>;
// this type lets us pretend to be the overseer
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>;
fn test_harness<T: Future<Output = ()>>(
run_args: bool,
test: impl FnOnce(OverseerHandle) -> T,
run_args: bool,
test: impl FnOnce(OverseerHandle) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
None,
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, overseer_handle) = make_subsystem_context(pool.clone());
let subsystem = FakeCollatorProtocolSubsystem::new(
pool,
run_args,
(),
).run(context);
let test_future = test(overseer_handle);
futures::pin_mut!(subsystem, test_future);
executor::block_on(async move {
future::join(subsystem, test_future)
.timeout(Duration::from_secs(2))
.await
.expect("test timed out instead of completing")
});
let _ = env_logger::builder()
.is_test(true)
.filter(
None,
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, overseer_handle) = make_subsystem_context(pool.clone());
let subsystem = FakeCollatorProtocolSubsystem::new(
pool,
run_args,
(),
).run(context);
let test_future = test(overseer_handle);
futures::pin_mut!(subsystem, test_future);
executor::block_on(async move {
future::join(subsystem, test_future)
.timeout(Duration::from_secs(2))
.await
.expect("test timed out instead of completing")
});
}
#[test]
fn starting_and_stopping_job_works() {
let relay_parent: Hash = [0; 32].into();
test_harness(true, |mut overseer_handle| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: relay_parent,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
assert_matches!(
overseer_handle.recv().await,
AllMessages::CollatorProtocol(_)
);
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::stop_work(relay_parent),
)))
.await;
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
});
let relay_parent: Hash = [0; 32].into();
test_harness(true, |mut overseer_handle| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: relay_parent,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
assert_matches!(
overseer_handle.recv().await,
AllMessages::CollatorProtocol(_)
);
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::stop_work(relay_parent),
)))
.await;
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
});
}
#[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let relay_parent = Hash::repeat_byte(0x01);
test_harness(true, |mut overseer_handle| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: relay_parent,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
// send to a non running job
overseer_handle
.send(FromOverseer::Communication {
msg: Default::default(),
})
.await;
// the subsystem is still alive
assert_matches!(
overseer_handle.recv().await,
AllMessages::CollatorProtocol(_)
);
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
});
let relay_parent = Hash::repeat_byte(0x01);
test_harness(true, |mut overseer_handle| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: relay_parent,
number: 1,
status: LeafStatus::Fresh,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
// send to a non running job
overseer_handle
.send(FromOverseer::Communication {
msg: Default::default(),
})
.await;
// the subsystem is still alive
assert_matches!(
overseer_handle.recv().await,
AllMessages::CollatorProtocol(_)
);
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
});
}
#[test]
fn test_subsystem_impl_and_name_derivation() {
let pool = sp_core::testing::TaskExecutor::new();
let (context, _) = make_subsystem_context::<CollatorProtocolMessage, _>(pool.clone());
let pool = sp_core::testing::TaskExecutor::new();
let (context, _) = make_subsystem_context::<CollatorProtocolMessage, _>(pool.clone());
let SpawnedSubsystem { name, .. } =
FakeCollatorProtocolSubsystem::new(pool, false, ()).start(context);
assert_eq!(name, "FakeCollatorProtocol");
let SpawnedSubsystem { name, .. } =
FakeCollatorProtocolSubsystem::new(pool, false, ()).start(context);
assert_eq!(name, "FakeCollatorProtocol");
}
#[test]
fn tick_tack_metronome() {
let n = Arc::new(AtomicUsize::default());
let (tick, mut block) = mpsc::unbounded();
let metronome = {
let n = n.clone();
let stream = Metronome::new(Duration::from_millis(137_u64));
stream.for_each(move |_res| {
let _ = n.fetch_add(1, Ordering::Relaxed);
let mut tick = tick.clone();
async move {
tick.send(()).await.expect("Test helper channel works. qed");
}
}).fuse()
};
let f2 = async move {
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 1_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 2_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 3_usize);
block.next().await;
assert_eq!(n.load(Ordering::Relaxed), 4_usize);
}.fuse();
futures::pin_mut!(f2);
futures::pin_mut!(metronome);
block_on(async move {
// futures::join!(metronome, f2)
futures::select!(
_ = metronome => unreachable!("Metronome never stops. qed"),
_ = f2 => (),
)
});
let n = Arc::new(AtomicUsize::default());
let (tick, mut block) = mpsc::unbounded();