From 7e2dba3e3afe8f4b54b6aa5f545910d1bd7f6345 Mon Sep 17 00:00:00 2001 From: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Wed, 14 Aug 2019 19:21:04 +0200 Subject: [PATCH] Remove usage of loop_fn in the GRANDPA tests (#3397) --- substrate/core/finality-grandpa/src/tests.rs | 118 +++++++++++-------- 1 file changed, 69 insertions(+), 49 deletions(-) diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index bbaae1e9b7e..f634fd114ae 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -1102,6 +1102,8 @@ fn voter_persists_its_votes() { let client = net.peer(0).client().clone(); let net = Arc::new(Mutex::new(net)); + // channel between the voter and the main controller. + // sending a message on the `voter_tx` restarts the voter. let (voter_tx, voter_rx) = mpsc::unbounded::<()>(); let mut keystore_paths = Vec::new(); @@ -1110,61 +1112,79 @@ fn voter_persists_its_votes() { // channel. whenever a message is received the voter is restarted. when the // sender is dropped the voter is stopped. { - let net = net.clone(); - let client = client.clone(); - let (keystore, keystore_path) = create_keystore(peers[0]); keystore_paths.push(keystore_path); - let voter = future::loop_fn(voter_rx, move |rx| { - let (_block_import, _, _, _, link) = net.lock().make_block_import(client.clone()); - let link = link.lock().take().unwrap(); + struct ResettableVoter { + voter: Box<dyn Future<Item = (), Error = ()> + Send>, + voter_rx: mpsc::UnboundedReceiver<()>, + net: Arc<Mutex<GrandpaTestNet>>, + client: PeersClient, + keystore: KeyStorePtr, + } - let grandpa_params = GrandpaParams { - config: Config { - gossip_duration: TEST_GOSSIP_DURATION, - justification_period: 32, - keystore: Some(keystore.clone()), - name: Some(format!("peer#{}", 0)), - }, - link, - network: net.lock().peers[0].network_service().clone(), - inherent_data_providers: InherentDataProviders::new(), - on_exit: Exit, - telemetry_on_connect: None, - }; + impl Future for ResettableVoter { + type Item = (); + type Error = (); - let voter = run_grandpa_voter(grandpa_params) - .expect("all in order with client and network") - .then(move |r| { - // we need to keep the block_import alive since it owns the - // sender for the voter commands channel, if that gets dropped - // then the voter will stop - drop(_block_import); - r - }); - - voter.select2(rx.into_future()).then(|res| match res { - Ok(future::Either::A(x)) => { - panic!("voter stopped unexpectedly: {:?}", x); - }, - Ok(future::Either::B(((Some(()), rx), _))) => { - Ok(future::Loop::Continue(rx)) - }, - Ok(future::Either::B(((None, _), _))) => { - Ok(future::Loop::Break(())) - }, - Err(future::Either::A(err)) => { - panic!("unexpected error: {:?}", err); - }, - Err(future::Either::B(..)) => { - // voter_rx dropped, stop the voter. - Ok(future::Loop::Break(())) - }, - }) - }); + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + match self.voter.poll() { + Ok(Async::Ready(())) | Err(_) => panic!("error in the voter"), + Ok(Async::NotReady) => {}, + } - runtime.spawn(voter); + match self.voter_rx.poll() { + Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => {} + Ok(Async::Ready(Some(()))) => { + let (_block_import, _, _, _, link) = + self.net.lock().make_block_import(self.client.clone()); + let link = link.lock().take().unwrap(); + + let grandpa_params = GrandpaParams { + config: Config { + gossip_duration: TEST_GOSSIP_DURATION, + justification_period: 32, + keystore: Some(self.keystore.clone()), + name: Some(format!("peer#{}", 0)), + }, + link, + network: self.net.lock().peers[0].network_service().clone(), + inherent_data_providers: InherentDataProviders::new(), + on_exit: Exit, + telemetry_on_connect: None, + }; + + let voter = run_grandpa_voter(grandpa_params) + .expect("all in order with client and network") + .then(move |r| { + // we need to keep the block_import alive since it owns the + // sender for the voter commands channel, if that gets dropped + // then the voter will stop + drop(_block_import); + r + }); + + self.voter = Box::new(voter); + // notify current task in order to poll the voter + futures::task::current().notify(); + } + }; + + Ok(Async::NotReady) + } + } + + // we create a "dummy" voter by setting it to `empty` and triggering the `tx`. + // this way, the `ResettableVoter` will reset its `voter` field to a value ASAP. + voter_tx.unbounded_send(()).unwrap(); + runtime.spawn(ResettableVoter { + voter: Box::new(futures::future::empty()), + voter_rx, + net: net.clone(), + client: client.clone(), + keystore, + }); } let (exit_tx, exit_rx) = futures::sync::oneshot::channel::<()>(); -- GitLab