From c792dd358d2ac263865841fecf0a9d0705b974e6 Mon Sep 17 00:00:00 2001 From: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Fri, 2 Aug 2019 19:52:55 +0200 Subject: [PATCH] Switch RPCs to stable futures (#3287) --- substrate/core/rpc/src/helpers.rs | 5 +++-- substrate/core/rpc/src/system/mod.rs | 8 ++++---- substrate/core/rpc/src/system/tests.rs | 6 +++--- substrate/core/service/src/components.rs | 3 ++- substrate/core/service/src/lib.rs | 8 ++++++-- 5 files changed, 18 insertions(+), 12 deletions(-) diff --git a/substrate/core/rpc/src/helpers.rs b/substrate/core/rpc/src/helpers.rs index ccfde6afb5c..2c69ead76ca 100644 --- a/substrate/core/rpc/src/helpers.rs +++ b/substrate/core/rpc/src/helpers.rs @@ -14,11 +14,12 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. -use futures::{prelude::*, sync::oneshot}; +use futures::prelude::*; +use futures03::{channel::oneshot, compat::Compat}; /// Wraps around `oneshot::Receiver` and adjusts the error type to produce an internal error if the /// sender gets dropped. -pub struct Receiver<T>(pub oneshot::Receiver<T>); +pub struct Receiver<T>(pub Compat<oneshot::Receiver<T>>); impl<T> Future for Receiver<T> { type Item = T; diff --git a/substrate/core/rpc/src/system/mod.rs b/substrate/core/rpc/src/system/mod.rs index c8cf97c6582..59ed73b588a 100644 --- a/substrate/core/rpc/src/system/mod.rs +++ b/substrate/core/rpc/src/system/mod.rs @@ -23,7 +23,7 @@ pub mod helpers; mod tests; use crate::helpers::Receiver; -use futures::sync::{mpsc, oneshot}; +use futures03::{channel::{mpsc, oneshot}, compat::Compat}; use jsonrpc_derive::rpc; use network; use sr_primitives::traits::{self, Header as HeaderT}; @@ -124,18 +124,18 @@ impl<B: traits::Block> SystemApi<B::Hash, <B::Header as HeaderT>::Number> for Sy fn system_health(&self) -> Receiver<Health> { let (tx, rx) = oneshot::channel(); let _ = self.send_back.unbounded_send(Request::Health(tx)); - Receiver(rx) + Receiver(Compat::new(rx)) } fn system_peers(&self) -> Receiver<Vec<PeerInfo<B::Hash, <B::Header as HeaderT>::Number>>> { let (tx, rx) = oneshot::channel(); let _ = self.send_back.unbounded_send(Request::Peers(tx)); - Receiver(rx) + Receiver(Compat::new(rx)) } fn system_network_state(&self) -> Receiver<network::NetworkState> { let (tx, rx) = oneshot::channel(); let _ = self.send_back.unbounded_send(Request::NetworkState(tx)); - Receiver(rx) + Receiver(Compat::new(rx)) } } diff --git a/substrate/core/rpc/src/system/tests.rs b/substrate/core/rpc/src/system/tests.rs index 2dc4139da30..70e8b4b95b6 100644 --- a/substrate/core/rpc/src/system/tests.rs +++ b/substrate/core/rpc/src/system/tests.rs @@ -20,7 +20,7 @@ use network::{self, PeerId}; use network::config::Roles; use test_client::runtime::Block; use assert_matches::assert_matches; -use futures::{prelude::*, sync::mpsc}; +use futures03::{prelude::*, channel::mpsc}; use std::thread; struct Status { @@ -46,7 +46,7 @@ fn api<T: Into<Option<Status>>>(sync: T) -> System<Block> { let should_have_peers = !status.is_dev; let (tx, rx) = mpsc::unbounded(); thread::spawn(move || { - tokio::run(rx.for_each(move |request| { + futures03::executor::block_on(rx.for_each(move |request| { match request { Request::Health(sender) => { let _ = sender.send(Health { @@ -82,7 +82,7 @@ fn api<T: Into<Option<Status>>>(sync: T) -> System<Block> { } }; - Ok(()) + future::ready(()) })) }); System::new(SystemInfo { diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index dff6161f165..570e894c396 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -32,7 +32,8 @@ use sr_primitives::{ use crate::config::Configuration; use primitives::{Blake2Hasher, H256, Pair}; use rpc::{self, apis::system::SystemInfo}; -use futures::{prelude::*, future::Executor, sync::mpsc}; +use futures::{prelude::*, future::Executor}; +use futures03::channel::mpsc; // Type aliases. // These exist mainly to avoid typing `<F as Factory>::Foo` all over the code. diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 9f5de4de1ba..cd904156c52 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -407,7 +407,7 @@ impl<Components: components::Components> Service<Components> { let _ = to_spawn_tx.unbounded_send(Box::new(tel_task)); // RPC - let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded(); + let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded(); let gen_handler = || { let system_info = rpc::apis::system::SystemInfo { chain_name: config.chain_spec.name().into(), @@ -635,9 +635,13 @@ fn build_network_future< mut network: network::NetworkWorker<ComponentBlock<Components>, S, H>, client: Arc<ComponentClient<Components>>, status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<(NetworkStatus<ComponentBlock<Components>>, NetworkState)>>>>, - mut rpc_rx: mpsc::UnboundedReceiver<rpc::apis::system::Request<ComponentBlock<Components>>>, + rpc_rx: futures03::channel::mpsc::UnboundedReceiver<rpc::apis::system::Request<ComponentBlock<Components>>>, should_have_peers: bool, ) -> impl Future<Item = (), Error = ()> { + // Compatibility shim while we're transitionning to stable Futures. + // See https://github.com/paritytech/substrate/issues/3099 + let mut rpc_rx = futures03::compat::Compat::new(rpc_rx.map(|v| Ok::<_, ()>(v))); + // Interval at which we send status updates on the status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); -- GitLab