Skip to content
Snippets Groups Projects
Commit 5fad9efc authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

add delay test (#107)

parent 471761f4
No related merge requests found
...@@ -19,12 +19,17 @@ ...@@ -19,12 +19,17 @@
use super::*; use super::*;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration; use std::time::Duration;
use futures::prelude::*; use futures::prelude::*;
use futures::sync::{oneshot, mpsc}; use futures::sync::{oneshot, mpsc};
use futures::future::FutureResult; use futures::future::FutureResult;
use tokio_timer::{self, Timer};
const ROUND_DURATION: Duration = Duration::from_millis(50);
struct Network<T> { struct Network<T> {
endpoints: Vec<mpsc::UnboundedSender<T>>, endpoints: Vec<mpsc::UnboundedSender<T>>,
input: mpsc::UnboundedReceiver<(usize, T)>, input: mpsc::UnboundedReceiver<(usize, T)>,
...@@ -97,12 +102,6 @@ struct AuthorityId(usize); ...@@ -97,12 +102,6 @@ struct AuthorityId(usize);
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
struct Signature(Message<Candidate, Digest>, AuthorityId); struct Signature(Message<Candidate, Digest>, AuthorityId);
struct SharedContext {
node_count: usize,
current_round: usize,
awaiting_round_timeouts: HashMap<usize, Vec<oneshot::Sender<()>>>,
}
#[derive(Debug)] #[derive(Debug)]
struct Error; struct Error;
...@@ -112,50 +111,12 @@ impl From<InputStreamConcluded> for Error { ...@@ -112,50 +111,12 @@ impl From<InputStreamConcluded> for Error {
} }
} }
impl SharedContext {
fn new(node_count: usize) -> Self {
SharedContext {
node_count,
current_round: 0,
awaiting_round_timeouts: HashMap::new()
}
}
fn round_timeout(&mut self, round: usize) -> Box<Future<Item=(),Error=Error>> {
let (tx, rx) = oneshot::channel();
if round < self.current_round {
tx.send(()).unwrap();
} else {
self.awaiting_round_timeouts
.entry(round)
.or_insert_with(Vec::new)
.push(tx);
}
Box::new(rx.map_err(|_| Error))
}
fn bump_round(&mut self) {
let awaiting_timeout = self.awaiting_round_timeouts
.remove(&self.current_round)
.unwrap_or_else(Vec::new);
for tx in awaiting_timeout {
let _ = tx.send(());
}
self.current_round += 1;
}
fn round_proposer(&self, round: usize) -> AuthorityId {
AuthorityId(round % self.node_count)
}
}
struct TestContext { struct TestContext {
local_id: AuthorityId, local_id: AuthorityId,
proposal: Mutex<usize>, proposal: Mutex<usize>,
shared: Arc<Mutex<SharedContext>>, node_count: usize,
current_round: Arc<AtomicUsize>,
timer: Timer,
} }
impl Context for TestContext { impl Context for TestContext {
...@@ -210,7 +171,7 @@ impl Context for TestContext { ...@@ -210,7 +171,7 @@ impl Context for TestContext {
} }
fn round_proposer(&self, round: usize) -> AuthorityId { fn round_proposer(&self, round: usize) -> AuthorityId {
self.shared.lock().unwrap().round_proposer(round) AuthorityId(round % self.node_count)
} }
fn proposal_valid(&self, proposal: &Candidate) -> FutureResult<bool, Error> { fn proposal_valid(&self, proposal: &Candidate) -> FutureResult<bool, Error> {
...@@ -218,7 +179,23 @@ impl Context for TestContext { ...@@ -218,7 +179,23 @@ impl Context for TestContext {
} }
fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout { fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
self.shared.lock().unwrap().round_timeout(round) if round < self.current_round.load(Ordering::SeqCst) {
Box::new(Ok(()).into_future())
} else {
let mut round_duration = ROUND_DURATION;
for _ in 0..round {
round_duration *= 2;
}
let current_round = self.current_round.clone();
let timeout = self.timer.sleep(round_duration)
.map(move |_| {
current_round.compare_and_swap(round, round + 1, Ordering::SeqCst);
})
.map_err(|_| Error);
Box::new(timeout)
}
} }
} }
...@@ -237,7 +214,7 @@ fn consensus_completes_with_minimum_good() { ...@@ -237,7 +214,7 @@ fn consensus_completes_with_minimum_good() {
let node_count = 10; let node_count = 10;
let max_faulty = 3; let max_faulty = 3;
let shared_context = Arc::new(Mutex::new(SharedContext::new(node_count))); let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build();
let (network, net_send, net_recv) = Network::new(node_count); let (network, net_send, net_recv) = Network::new(node_count);
network.route_on_thread(); network.route_on_thread();
...@@ -251,7 +228,9 @@ fn consensus_completes_with_minimum_good() { ...@@ -251,7 +228,9 @@ fn consensus_completes_with_minimum_good() {
let ctx = TestContext { let ctx = TestContext {
local_id: AuthorityId(i), local_id: AuthorityId(i),
proposal: Mutex::new(i), proposal: Mutex::new(i),
shared: shared_context.clone(), current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
node_count,
}; };
agree( agree(
...@@ -264,15 +243,6 @@ fn consensus_completes_with_minimum_good() { ...@@ -264,15 +243,6 @@ fn consensus_completes_with_minimum_good() {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
::std::thread::spawn(move || {
let mut timeout = ::std::time::Duration::from_millis(50);
loop {
::std::thread::sleep(timeout.clone());
shared_context.lock().unwrap().bump_round();
timeout *= 2;
}
});
let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error); let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
let results = ::futures::future::join_all(nodes) let results = ::futures::future::join_all(nodes)
.map(Some) .map(Some)
...@@ -293,7 +263,7 @@ fn consensus_does_not_complete_without_enough_nodes() { ...@@ -293,7 +263,7 @@ fn consensus_does_not_complete_without_enough_nodes() {
let node_count = 10; let node_count = 10;
let max_faulty = 3; let max_faulty = 3;
let shared_context = Arc::new(Mutex::new(SharedContext::new(node_count))); let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build();
let (network, net_send, net_recv) = Network::new(node_count); let (network, net_send, net_recv) = Network::new(node_count);
network.route_on_thread(); network.route_on_thread();
...@@ -307,7 +277,9 @@ fn consensus_does_not_complete_without_enough_nodes() { ...@@ -307,7 +277,9 @@ fn consensus_does_not_complete_without_enough_nodes() {
let ctx = TestContext { let ctx = TestContext {
local_id: AuthorityId(i), local_id: AuthorityId(i),
proposal: Mutex::new(i), proposal: Mutex::new(i),
shared: shared_context.clone(), current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
node_count,
}; };
agree( agree(
...@@ -348,9 +320,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { ...@@ -348,9 +320,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
.collect() .collect()
}.check(7, |_, _, s| Some(s.1.clone())).unwrap(); }.check(7, |_, _, s| Some(s.1.clone())).unwrap();
let mut shared_context = SharedContext::new(node_count); let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build();
shared_context.current_round = locked_round + 1;
let shared_context = Arc::new(Mutex::new(shared_context));
let (network, net_send, net_recv) = Network::new(node_count); let (network, net_send, net_recv) = Network::new(node_count);
network.route_on_thread(); network.route_on_thread();
...@@ -363,9 +333,10 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { ...@@ -363,9 +333,10 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
let ctx = TestContext { let ctx = TestContext {
local_id: AuthorityId(i), local_id: AuthorityId(i),
proposal: Mutex::new(i), proposal: Mutex::new(i),
shared: shared_context.clone(), current_round: Arc::new(AtomicUsize::new(locked_round + 1)),
timer: timer.clone(),
node_count,
}; };
let mut agreement = agree( let mut agreement = agree(
ctx, ctx,
node_count, node_count,
...@@ -396,15 +367,6 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { ...@@ -396,15 +367,6 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
::std::thread::spawn(move || {
let mut timeout = ::std::time::Duration::from_millis(50);
loop {
::std::thread::sleep(timeout.clone());
shared_context.lock().unwrap().bump_round();
timeout *= 2;
}
});
let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error); let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
let results = ::futures::future::join_all(nodes) let results = ::futures::future::join_all(nodes)
.map(Some) .map(Some)
...@@ -419,3 +381,57 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() { ...@@ -419,3 +381,57 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
assert_eq!(&result.justification.digest, &locked_digest); assert_eq!(&result.justification.digest, &locked_digest);
} }
} }
#[test]
fn consensus_completes_even_when_nodes_start_with_a_delay() {
let node_count = 10;
let max_faulty = 3;
let base_sleep = Duration::from_millis(75);
let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build();
let (network, net_send, net_recv) = Network::new(node_count);
network.route_on_thread();
let nodes = net_send
.into_iter()
.zip(net_recv)
.take(node_count - max_faulty)
.enumerate()
.map(|(i, (tx, rx))| {
let ctx = TestContext {
local_id: AuthorityId(i),
proposal: Mutex::new(i),
current_round: Arc::new(AtomicUsize::new(0)),
timer: timer.clone(),
node_count,
};
let sleep_duration = base_sleep * i as u32;
timer.sleep(sleep_duration).map_err(|_| Error).and_then(move |_| {
agree(
ctx,
node_count,
max_faulty,
rx.map_err(|_| Error),
tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
)
})
})
.collect::<Vec<_>>();
let timeout = timeout_in(Duration::from_millis(750)).map_err(|_| Error);
let results = ::futures::future::join_all(nodes)
.map(Some)
.select(timeout.map(|_| None))
.wait()
.map(|(i, _)| i)
.map_err(|(e, _)| e)
.expect("to complete")
.expect("to not time out");
for result in &results {
assert_eq!(&result.justification.digest, &results[0].justification.digest);
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment