From 5fad9efc0a056f63141f693b4fc23226c914631d Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Tue, 3 Apr 2018 13:25:21 +0200
Subject: [PATCH] add delay test (#107)

---
 substrate/substrate/bft/src/generic/tests.rs | 168 ++++++++++---------
 1 file changed, 92 insertions(+), 76 deletions(-)

diff --git a/substrate/substrate/bft/src/generic/tests.rs b/substrate/substrate/bft/src/generic/tests.rs
index 349bec693f9..00d8ccf9a54 100644
--- a/substrate/substrate/bft/src/generic/tests.rs
+++ b/substrate/substrate/bft/src/generic/tests.rs
@@ -19,12 +19,17 @@
 use super::*;
 
 use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::time::Duration;
 
 use futures::prelude::*;
 use futures::sync::{oneshot, mpsc};
 use futures::future::FutureResult;
 
+use tokio_timer::{self, Timer};
+
+const ROUND_DURATION: Duration = Duration::from_millis(50);
+
 struct Network<T> {
 	endpoints: Vec<mpsc::UnboundedSender<T>>,
 	input: mpsc::UnboundedReceiver<(usize, T)>,
@@ -97,12 +102,6 @@ struct AuthorityId(usize);
 #[derive(Debug, PartialEq, Eq, Clone)]
 struct Signature(Message<Candidate, Digest>, AuthorityId);
 
-struct SharedContext {
-	node_count: usize,
-	current_round: usize,
-	awaiting_round_timeouts: HashMap<usize, Vec<oneshot::Sender<()>>>,
-}
-
 #[derive(Debug)]
 struct Error;
 
@@ -112,50 +111,12 @@ impl From<InputStreamConcluded> for Error {
 	}
 }
 
-impl SharedContext {
-	fn new(node_count: usize) -> Self {
-		SharedContext {
-			node_count,
-			current_round: 0,
-			awaiting_round_timeouts: HashMap::new()
-		}
-	}
-
-	fn round_timeout(&mut self, round: usize) -> Box<Future<Item=(),Error=Error>> {
-		let (tx, rx) = oneshot::channel();
-		if round < self.current_round {
-			tx.send(()).unwrap();
-		} else {
-			self.awaiting_round_timeouts
-				.entry(round)
-				.or_insert_with(Vec::new)
-				.push(tx);
-		}
-
-		Box::new(rx.map_err(|_| Error))
-	}
-
-	fn bump_round(&mut self) {
-		let awaiting_timeout = self.awaiting_round_timeouts
-			.remove(&self.current_round)
-			.unwrap_or_else(Vec::new);
-
-		for tx in awaiting_timeout {
-			let _ = tx.send(());
-		}
-
-		self.current_round += 1;
-	}
-
-	fn round_proposer(&self, round: usize) -> AuthorityId {
-		AuthorityId(round % self.node_count)
-	}
-}
-
 struct TestContext {
 	local_id: AuthorityId,
 	proposal: Mutex<usize>,
-	shared: Arc<Mutex<SharedContext>>,
+	node_count: usize,
+	current_round: Arc<AtomicUsize>,
+	timer: Timer,
 }
 
 impl Context for TestContext {
@@ -210,7 +171,7 @@ impl Context for TestContext {
 	}
 
 	fn round_proposer(&self, round: usize) -> AuthorityId {
-		self.shared.lock().unwrap().round_proposer(round)
+		AuthorityId(round % self.node_count)
 	}
 
 	fn proposal_valid(&self, proposal: &Candidate) -> FutureResult<bool, Error> {
@@ -218,7 +179,23 @@ impl Context for TestContext {
 	}
 
 	fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
-		self.shared.lock().unwrap().round_timeout(round)
+		if round < self.current_round.load(Ordering::SeqCst) {
+			Box::new(Ok(()).into_future())
+		} else {
+			let mut round_duration = ROUND_DURATION;
+			for _ in 0..round {
+				round_duration *= 2;
+			}
+
+			let current_round = self.current_round.clone();
+			let timeout = self.timer.sleep(round_duration)
+				.map(move |_| {
+					current_round.compare_and_swap(round, round + 1, Ordering::SeqCst);
+				})
+				.map_err(|_| Error);
+
+			Box::new(timeout)
+		}
 	}
 }
 
@@ -237,7 +214,7 @@ fn consensus_completes_with_minimum_good() {
 	let node_count = 10;
 	let max_faulty = 3;
 
-	let shared_context = Arc::new(Mutex::new(SharedContext::new(node_count)));
+	let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build();
 
 	let (network, net_send, net_recv) = Network::new(node_count);
 	network.route_on_thread();
@@ -251,7 +228,9 @@ fn consensus_completes_with_minimum_good() {
 			let ctx = TestContext {
 				local_id: AuthorityId(i),
 				proposal: Mutex::new(i),
-				shared: shared_context.clone(),
+				current_round: Arc::new(AtomicUsize::new(0)),
+				timer: timer.clone(),
+				node_count,
 			};
 
 			agree(
@@ -264,15 +243,6 @@ fn consensus_completes_with_minimum_good() {
 		})
 		.collect::<Vec<_>>();
 
-	::std::thread::spawn(move || {
-		let mut timeout = ::std::time::Duration::from_millis(50);
-		loop {
-			::std::thread::sleep(timeout.clone());
-			shared_context.lock().unwrap().bump_round();
-			timeout *= 2;
-		}
-	});
-
 	let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
 	let results = ::futures::future::join_all(nodes)
 		.map(Some)
@@ -293,7 +263,7 @@ fn consensus_does_not_complete_without_enough_nodes() {
 	let node_count = 10;
 	let max_faulty = 3;
 
-	let shared_context = Arc::new(Mutex::new(SharedContext::new(node_count)));
+	let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build();
 
 	let (network, net_send, net_recv) = Network::new(node_count);
 	network.route_on_thread();
@@ -307,7 +277,9 @@ fn consensus_does_not_complete_without_enough_nodes() {
 			let ctx = TestContext {
 				local_id: AuthorityId(i),
 				proposal: Mutex::new(i),
-				shared: shared_context.clone(),
+				current_round: Arc::new(AtomicUsize::new(0)),
+				timer: timer.clone(),
+				node_count,
 			};
 
 			agree(
@@ -348,9 +320,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
 			.collect()
 	}.check(7, |_, _, s| Some(s.1.clone())).unwrap();
 
-	let mut shared_context = SharedContext::new(node_count);
-	shared_context.current_round = locked_round + 1;
-	let shared_context = Arc::new(Mutex::new(shared_context));
+	let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build();
 
 	let (network, net_send, net_recv) = Network::new(node_count);
 	network.route_on_thread();
@@ -363,9 +333,10 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
 			let ctx = TestContext {
 				local_id: AuthorityId(i),
 				proposal: Mutex::new(i),
-				shared: shared_context.clone(),
+				current_round: Arc::new(AtomicUsize::new(locked_round + 1)),
+				timer: timer.clone(),
+				node_count,
 			};
-
 			let mut agreement = agree(
 				ctx,
 				node_count,
@@ -396,15 +367,6 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
 		})
 		.collect::<Vec<_>>();
 
-	::std::thread::spawn(move || {
-		let mut timeout = ::std::time::Duration::from_millis(50);
-		loop {
-			::std::thread::sleep(timeout.clone());
-			shared_context.lock().unwrap().bump_round();
-			timeout *= 2;
-		}
-	});
-
 	let timeout = timeout_in(Duration::from_millis(500)).map_err(|_| Error);
 	let results = ::futures::future::join_all(nodes)
 		.map(Some)
@@ -419,3 +381,57 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
 		assert_eq!(&result.justification.digest, &locked_digest);
 	}
 }
+
+#[test]
+fn consensus_completes_even_when_nodes_start_with_a_delay() {
+	let node_count = 10;
+	let max_faulty = 3;
+	let base_sleep = Duration::from_millis(75);
+
+	let timer = tokio_timer::wheel().tick_duration(ROUND_DURATION).build();
+
+	let (network, net_send, net_recv) = Network::new(node_count);
+	network.route_on_thread();
+
+	let nodes = net_send
+		.into_iter()
+		.zip(net_recv)
+		.take(node_count - max_faulty)
+		.enumerate()
+		.map(|(i, (tx, rx))| {
+			let ctx = TestContext {
+				local_id: AuthorityId(i),
+				proposal: Mutex::new(i),
+				current_round: Arc::new(AtomicUsize::new(0)),
+				timer: timer.clone(),
+				node_count,
+			};
+
+			let sleep_duration = base_sleep * i as u32;
+
+			timer.sleep(sleep_duration).map_err(|_| Error).and_then(move |_| {
+				agree(
+					ctx,
+					node_count,
+					max_faulty,
+					rx.map_err(|_| Error),
+					tx.sink_map_err(|_| Error).with(move |t| Ok((i, t))),
+				)
+			})
+		})
+		.collect::<Vec<_>>();
+
+	let timeout = timeout_in(Duration::from_millis(750)).map_err(|_| Error);
+	let results = ::futures::future::join_all(nodes)
+		.map(Some)
+		.select(timeout.map(|_| None))
+		.wait()
+		.map(|(i, _)| i)
+		.map_err(|(e, _)| e)
+		.expect("to complete")
+		.expect("to not time out");
+
+	for result in &results {
+		assert_eq!(&result.justification.digest, &results[0].justification.digest);
+	}
+}
-- 
GitLab