diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 2e0be7daf90c7b75429ab89c7225e134bdf609c7..e69ee8e6e79a79afcfb36704d8365dd71980d96e 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -1232,14 +1232,16 @@ dependencies = [ [[package]] name = "finality-grandpa" -version = "0.10.3" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -5476,7 +5478,7 @@ name = "sc-finality-grandpa" version = "0.8.0" dependencies = [ "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", - "finality-grandpa 0.10.3 (registry+https://github.com/rust-lang/crates.io-index)", + "finality-grandpa 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "fork-tree 2.0.0", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5484,6 +5486,7 @@ dependencies = [ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "sc-client 0.8.0", "sc-client-api 2.0.0", @@ -8406,7 +8409,7 @@ dependencies = [ "checksum fallible-iterator 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" "checksum file-per-thread-logger 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8505b75b31ef7285168dd237c4a7db3c1f3e0927e7d314e670bc98e854272fe9" -"checksum finality-grandpa 0.10.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2d9ad6bb0e42865b2d79fc9c8a08f22c39127310ed3334f2a1119ca25ed69dfb" +"checksum finality-grandpa 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "52c48f8a628193ba18639b2f727c32132d75f167a4b32f44b252ea8b937f154c" "checksum fixed-hash 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "72fe7539e2c5692c6989f2f9c0457e42f1e5768f96b85c87d273574670ae459f" "checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" "checksum flate2 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)" = "6bd6d6f4752952feb71363cffc9ebac9411b75b87c6ab6058c40c8900cf43c0f" diff --git a/substrate/bin/node-template/src/service.rs b/substrate/bin/node-template/src/service.rs index 458656d836d47181245d77720d68f042ce2210e8..8a5c660b23c7a04e08ba21117bc26ad286966717 100644 --- a/substrate/bin/node-template/src/service.rs +++ b/substrate/bin/node-template/src/service.rs @@ -161,7 +161,7 @@ pub fn new_full<C: Send + Default + 'static>(config: Configuration<C, GenesisCon service.network(), service.on_exit(), service.spawn_task_handle(), - )?.compat().map(drop)); + )?); }, (true, false) => { // start the full GRANDPA voter @@ -178,7 +178,7 @@ pub fn new_full<C: Send + Default + 'static>(config: Configuration<C, GenesisCon // the GRANDPA voter task is considered infallible, i.e. // if it fails we take down the service with it. - service.spawn_essential_task(grandpa::run_grandpa_voter(voter_config)?.compat().map(drop)); + service.spawn_essential_task(grandpa::run_grandpa_voter(voter_config)?); }, (_, true) => { grandpa::setup_disabled_grandpa( diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 75cb6ac4c48c54eafb05cdca8e7941c3a15185b4..2aaca315b9058f6c4fb94ea4f6397c17ff122383 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -110,10 +110,7 @@ macro_rules! new_full_start { /// concrete types instead. macro_rules! new_full { ($config:expr, $with_startup_data: expr) => {{ - use futures::{ - prelude::*, - compat::Future01CompatExt - }; + use futures::prelude::*; use sc_network::Event; let ( @@ -220,7 +217,7 @@ macro_rules! new_full { service.network(), service.on_exit(), service.spawn_task_handle(), - )?.compat().map(drop)); + )?); }, (true, false) => { // start the full GRANDPA voter @@ -237,7 +234,7 @@ macro_rules! new_full { // the GRANDPA voter task is considered infallible, i.e. // if it fails we take down the service with it. service.spawn_essential_task( - grandpa::run_grandpa_voter(grandpa_config)?.compat().map(drop) + grandpa::run_grandpa_voter(grandpa_config)? ); }, (_, true) => { diff --git a/substrate/client/finality-grandpa/Cargo.toml b/substrate/client/finality-grandpa/Cargo.toml index 2c0a10857c7f1524369464d78cf2a36a21db8300..a26b2e7a41cb9f1240eb67e9f0b86809262298ab 100644 --- a/substrate/client/finality-grandpa/Cargo.toml +++ b/substrate/client/finality-grandpa/Cargo.toml @@ -6,8 +6,7 @@ edition = "2018" [dependencies] fork-tree = { version = "2.0.0", path = "../../utils/fork-tree" } -futures = "0.1.29" -futures03 = { package = "futures", version = "0.3.1", features = ["compat"] } +futures = "0.3.1" futures-timer = "2.0.2" log = "0.4.8" parking_lot = "0.9.0" @@ -28,10 +27,11 @@ sc-network = { version = "0.8", path = "../network" } sc-network-gossip = { version = "0.8", path = "../network-gossip" } sp-finality-tracker = { version = "2.0.0", path = "../../primitives/finality-tracker" } sp-finality-grandpa = { version = "2.0.0", path = "../../primitives/finality-grandpa" } -finality-grandpa = { version = "0.10.3", features = ["derive-codec"] } +finality-grandpa = { version = "0.11.0", features = ["derive-codec"] } +pin-project = "0.4.6" [dev-dependencies] -finality-grandpa = { version = "0.10.3", features = ["derive-codec", "test-helpers"] } +finality-grandpa = { version = "0.11.0", features = ["derive-codec", "test-helpers"] } sc-network = { version = "0.8", path = "../network" } sc-network-test = { version = "0.8.0", path = "../network/test" } sp-keyring = { version = "2.0.0", path = "../../primitives/keyring" } @@ -42,3 +42,4 @@ env_logger = "0.7.0" tokio = "0.1.22" tempfile = "3.1.0" sp-api = { version = "2.0.0", path = "../../primitives/api" } +futures01 = { package = "futures", version = "0.1.29" } diff --git a/substrate/client/finality-grandpa/src/communication/gossip.rs b/substrate/client/finality-grandpa/src/communication/gossip.rs index 7b21c1d0797d315ac49acd951d223d2c48b49617..7fef47867f0df1f722abfcd445045d6ff83a9fb7 100644 --- a/substrate/client/finality-grandpa/src/communication/gossip.rs +++ b/substrate/client/finality-grandpa/src/communication/gossip.rs @@ -90,8 +90,7 @@ use sp_finality_grandpa::AuthorityId; use sc_telemetry::{telemetry, CONSENSUS_DEBUG}; use log::{trace, debug}; -use futures::prelude::*; -use futures03::channel::mpsc; +use futures::channel::mpsc; use rand::seq::SliceRandom; use crate::{environment, CatchUp, CompactCommit, SignedMessage}; diff --git a/substrate/client/finality-grandpa/src/communication/mod.rs b/substrate/client/finality-grandpa/src/communication/mod.rs index 64637c0ed8f96926607399d540f9f841ed7b9f2f..42f26aa77e2cad083f694447b342f388158fd48b 100644 --- a/substrate/client/finality-grandpa/src/communication/mod.rs +++ b/substrate/client/finality-grandpa/src/communication/mod.rs @@ -27,17 +27,10 @@ //! In the future, there will be a fallback for allowing sending the same message //! under certain conditions that are used to un-stick the protocol. -use futures::prelude::*; -use futures03::{ - channel::mpsc as mpsc03, - compat::Compat, - future::{self as future03, Future as Future03}, - sink::Sink as Sink03, - stream::{Stream as Stream03, StreamExt}, -}; +use futures::{prelude::*, channel::mpsc}; use log::{debug, trace}; use parking_lot::Mutex; -use std::{pin::Pin, sync::Arc, task::{Context, Poll as Poll03}}; +use std::{pin::Pin, sync::Arc, task::{Context, Poll}}; use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use finality_grandpa::{voter, voter_set::VoterSet}; @@ -49,8 +42,8 @@ use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, Numb use sc_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO}; use crate::{ - CatchUp, Commit, CommunicationIn, CommunicationOut, CompactCommit, Error, - Message, SignedMessage, + CatchUp, Commit, CommunicationIn, CommunicationOutH, + CompactCommit, Error, Message, SignedMessage, }; use crate::environment::HasVoted; use gossip::{ @@ -171,7 +164,7 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> { // thus one has to wrap gossip_validator_report_stream with an `Arc` `Mutex`. Given that it is // just an `UnboundedReceiver`, one could also switch to a multi-producer-*multi*-consumer // channel implementation. - gossip_validator_report_stream: Arc<Mutex<mpsc03::UnboundedReceiver<PeerReport>>>, + gossip_validator_report_stream: Arc<Mutex<mpsc::UnboundedReceiver<PeerReport>>>, } impl<B: BlockT, N: Network<B>> Unpin for NetworkBridge<B, N> {} @@ -185,7 +178,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { service: N, config: crate::Config, set_state: crate::environment::SharedVoterSetState<B>, - executor: &impl futures03::task::Spawn, + executor: &impl futures::task::Spawn, ) -> Self { let (validator, report_stream) = GossipValidator::new( config, @@ -277,7 +270,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { local_key: Option<AuthorityPair>, has_voted: HasVoted<B>, ) -> ( - impl Stream03<Item=SignedMessage<B>> + Unpin, + impl Stream<Item = SignedMessage<B>> + Unpin, OutgoingMessages<B>, ) { self.note_round( @@ -303,13 +296,13 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { match decoded { Err(ref e) => { debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e); - return future03::ready(None); + return future::ready(None); } Ok(GossipMessage::Vote(msg)) => { // check signature. if !voters.contains_key(&msg.message.id) { debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id); - return future03::ready(None); + return future::ready(None); } if voters.len() <= TELEMETRY_VOTERS_LIMIT { @@ -338,16 +331,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { }; } - future03::ready(Some(msg.message)) + future::ready(Some(msg.message)) } _ => { debug!(target: "afg", "Skipping unknown message type"); - return future03::ready(None); + return future::ready(None); } } }); - let (tx, out_rx) = mpsc03::channel(0); + let (tx, out_rx) = mpsc::channel(0); let outgoing = OutgoingMessages::<B> { round: round.0, set_id: set_id.0, @@ -360,7 +353,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { // Combine incoming votes from external GRANDPA nodes with outgoing // votes from our own GRANDPA voter to have a single // vote-import-pipeline. - let incoming = futures03::stream::select(incoming, out_rx); + let incoming = stream::select(incoming, out_rx); (incoming, outgoing) } @@ -372,8 +365,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { voters: Arc<VoterSet<AuthorityId>>, is_voter: bool, ) -> ( - impl Stream<Item = CommunicationIn<B>, Error = Error>, - impl Sink<SinkItem = CommunicationOut<B>, SinkError = Error>, + impl Stream<Item = CommunicationIn<B>>, + impl Sink<CommunicationOutH<B, B::Hash>, Error = Error> + Unpin, ) { self.validator.note_set( set_id, @@ -401,7 +394,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { let outgoing = outgoing.with(|out| { let voter::CommunicationOut::Commit(round, commit) = out; - Ok((round, commit)) + future::ok((round, commit)) }); (incoming, outgoing) @@ -423,35 +416,35 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { } } -impl<B: BlockT, N: Network<B>> Future03 for NetworkBridge<B, N> { +impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> { type Output = Result<(), Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Self::Output> { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { loop { match self.neighbor_packet_worker.lock().poll_next_unpin(cx) { - Poll03::Ready(Some((to, packet))) => { + Poll::Ready(Some((to, packet))) => { self.gossip_engine.send_message(to, packet.encode()); }, - Poll03::Ready(None) => return Poll03::Ready( + Poll::Ready(None) => return Poll::Ready( Err(Error::Network("Neighbor packet worker stream closed.".into())) ), - Poll03::Pending => break, + Poll::Pending => break, } } loop { match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) { - Poll03::Ready(Some(PeerReport { who, cost_benefit })) => { + Poll::Ready(Some(PeerReport { who, cost_benefit })) => { self.gossip_engine.report(who, cost_benefit); }, - Poll03::Ready(None) => return Poll03::Ready( + Poll::Ready(None) => return Poll::Ready( Err(Error::Network("Gossip validator report stream closed.".into())) ), - Poll03::Pending => break, + Poll::Pending => break, } } - Poll03::Pending + Poll::Pending } } @@ -461,7 +454,7 @@ fn incoming_global<B: BlockT>( voters: Arc<VoterSet<AuthorityId>>, gossip_validator: Arc<GossipValidator<B>>, neighbor_sender: periodic::NeighborPacketSender<B>, -) -> impl Stream<Item = CommunicationIn<B>, Error = Error> { +) -> impl Stream<Item = CommunicationIn<B>> { let process_commit = move | msg: FullCommitMessage<B>, mut notification: sc_network_gossip::TopicNotification, @@ -564,29 +557,27 @@ fn incoming_global<B: BlockT>( Some(voter::CommunicationIn::CatchUp(msg.message, cb)) }; - Compat::new(gossip_engine.messages_for(topic) - .map(|m| Ok::<_, ()>(m))) + gossip_engine.messages_for(topic) .filter_map(|notification| { // this could be optimized by decoding piecewise. let decoded = GossipMessage::<B>::decode(&mut ¬ification.message[..]); if let Err(ref e) = decoded { trace!(target: "afg", "Skipping malformed commit message {:?}: {}", notification, e); } - decoded.map(move |d| (notification, d)).ok() + future::ready(decoded.map(move |d| (notification, d)).ok()) }) .filter_map(move |(notification, msg)| { - match msg { + future::ready(match msg { GossipMessage::Commit(msg) => process_commit(msg, notification, &mut gossip_engine, &gossip_validator, &*voters), GossipMessage::CatchUp(msg) => process_catch_up(msg, notification, &mut gossip_engine, &gossip_validator, &*voters), _ => { debug!(target: "afg", "Skipping unknown message type"); - return None; + None } - } + }) }) - .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) } impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> { @@ -687,19 +678,19 @@ pub(crate) struct OutgoingMessages<Block: BlockT> { round: RoundNumber, set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, - sender: mpsc03::Sender<SignedMessage<Block>>, + sender: mpsc::Sender<SignedMessage<Block>>, network: GossipEngine<Block>, has_voted: HasVoted<Block>, } impl<B: BlockT> Unpin for OutgoingMessages<B> {} -impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block> +impl<Block: BlockT> Sink<Message<Block>> for OutgoingMessages<Block> { type Error = Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> { - Sink03::poll_ready(Pin::new(&mut self.sender), cx) + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { + Sink::poll_ready(Pin::new(&mut self.sender), cx) .map(|elem| { elem.map_err(|e| { Error::Network(format!("Failed to poll_ready channel sender: {:?}", e)) })}) @@ -769,12 +760,12 @@ impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block> Ok(()) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03<Result<(), Self::Error>> { - Poll03::Ready(Ok(())) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> { - Sink03::poll_close(Pin::new(&mut self.sender), cx) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { + Sink::poll_close(Pin::new(&mut self.sender), cx) .map(|elem| { elem.map_err(|e| { Error::Network(format!("Failed to poll_close channel sender: {:?}", e)) })}) @@ -985,13 +976,16 @@ impl<Block: BlockT> CommitsOut<Block> { } } -impl<Block: BlockT> Sink for CommitsOut<Block> { - type SinkItem = (RoundNumber, Commit<Block>); - type SinkError = Error; +impl<Block: BlockT> Sink<(RoundNumber, Commit<Block>)> for CommitsOut<Block> { + type Error = Error; + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } - fn start_send(&mut self, input: (RoundNumber, Commit<Block>)) -> StartSend<Self::SinkItem, Error> { + fn start_send(self: Pin<&mut Self>, input: (RoundNumber, Commit<Block>)) -> Result<(), Self::Error> { if !self.is_voter { - return Ok(AsyncSink::Ready); + return Ok(()); } let (round, commit) = input; @@ -1027,9 +1021,14 @@ impl<Block: BlockT> Sink for CommitsOut<Block> { ); self.network.gossip_message(topic, message.encode(), false); - Ok(AsyncSink::Ready) + Ok(()) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) } - fn close(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } - fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } + fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } } diff --git a/substrate/client/finality-grandpa/src/communication/periodic.rs b/substrate/client/finality-grandpa/src/communication/periodic.rs index d5c8c1e0b856c0666f3b994f71f50ffe7dd4e222..463589969012ff7c3920a0562e065cef391ffc85 100644 --- a/substrate/client/finality-grandpa/src/communication/periodic.rs +++ b/substrate/client/finality-grandpa/src/communication/periodic.rs @@ -17,7 +17,7 @@ //! Periodic rebroadcast of neighbor packets. use futures_timer::Delay; -use futures03::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream}; +use futures::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream}; use log::debug; use std::{pin::Pin, task::{Context, Poll}, time::{Instant, Duration}}; diff --git a/substrate/client/finality-grandpa/src/communication/tests.rs b/substrate/client/finality-grandpa/src/communication/tests.rs index c104af033926067711aad368d26cdd5ab8a3eeb1..a5435bdfdce5582df98c712bca64606fd563ebdc 100644 --- a/substrate/client/finality-grandpa/src/communication/tests.rs +++ b/substrate/client/finality-grandpa/src/communication/tests.rs @@ -16,12 +16,11 @@ //! Tests for the communication portion of the GRANDPA crate. -use futures::sync::mpsc; +use futures::channel::mpsc; use futures::prelude::*; use sc_network::{Event as NetworkEvent, PeerId, config::Roles}; use sc_network_test::{Block, Hash}; use sc_network_gossip::Validator; -use tokio::runtime::current_thread; use std::sync::Arc; use sp_keyring::Ed25519Keyring; use parity_scale_codec::Encode; @@ -44,11 +43,19 @@ struct TestNetwork { sender: mpsc::UnboundedSender<Event>, } -impl sc_network_gossip::Network<Block> for TestNetwork { - fn event_stream(&self) -> Box<dyn futures::Stream<Item = NetworkEvent, Error = ()> + Send> { +impl TestNetwork { + fn event_stream_03(&self) -> Pin<Box<dyn futures::Stream<Item = NetworkEvent> + Send>> { let (tx, rx) = mpsc::unbounded(); let _ = self.sender.unbounded_send(Event::EventStream(tx)); - Box::new(rx) + Box::pin(rx) + } +} + +impl sc_network_gossip::Network<Block> for TestNetwork { + fn event_stream(&self) -> Box<dyn futures01::Stream<Item = NetworkEvent, Error = ()> + Send> { + Box::new( + self.event_stream_03().map(Ok::<_, ()>).compat() + ) } fn report_peer(&self, who: sc_network::PeerId, cost_benefit: sc_network::ReputationChange) { @@ -101,17 +108,17 @@ struct Tester { } impl Tester { - fn filter_network_events<F>(self, mut pred: F) -> impl Future<Item=Self,Error=()> + fn filter_network_events<F>(self, mut pred: F) -> impl Future<Output = Self> where F: FnMut(Event) -> bool { let mut s = Some(self); - futures::future::poll_fn(move || loop { - match s.as_mut().unwrap().events.poll().expect("concluded early") { - Async::Ready(None) => panic!("concluded early"), - Async::Ready(Some(item)) => if pred(item) { - return Ok(Async::Ready(s.take().unwrap())) + futures::future::poll_fn(move |cx| loop { + match Stream::poll_next(Pin::new(&mut s.as_mut().unwrap().events), cx) { + Poll::Ready(None) => panic!("concluded early"), + Poll::Ready(Some(item)) => if pred(item) { + return Poll::Ready(s.take().unwrap()) }, - Async::NotReady => return Ok(Async::NotReady), + Poll::Pending => return Poll::Pending, } }) } @@ -149,8 +156,8 @@ fn voter_set_state() -> SharedVoterSetState<Block> { } // needs to run in a tokio runtime. -fn make_test_network(executor: &impl futures03::task::Spawn) -> ( - impl Future<Item=Tester,Error=()>, +fn make_test_network(executor: &impl futures::task::Spawn) -> ( + impl Future<Output = Tester>, TestNetwork, ) { let (tx, rx) = mpsc::unbounded(); @@ -159,7 +166,7 @@ fn make_test_network(executor: &impl futures03::task::Spawn) -> ( #[derive(Clone)] struct Exit; - impl futures03::Future for Exit { + impl futures::Future for Exit { type Output = (); fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<()> { @@ -175,7 +182,7 @@ fn make_test_network(executor: &impl futures03::task::Spawn) -> ( ); ( - futures::future::ok(Tester { + futures::future::ready(Tester { gossip_validator: bridge.validator.clone(), net_handle: bridge, events: rx, @@ -245,14 +252,14 @@ fn good_commit_leads_to_relay() { let id = sc_network::PeerId::random(); let global_topic = super::global_topic::<Block>(set_id); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let test = make_test_network(&threads_pool).0 - .and_then(move |tester| { + .then(move |tester| { // register a peer. tester.gossip_validator.new_peer(&mut NoopContext, &id, sc_network::config::Roles::FULL); - Ok((tester, id)) + future::ready((tester, id)) }) - .and_then(move |(tester, id)| { + .then(move |(tester, id)| { // start round, dispatch commit, and wait for broadcast. let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false); @@ -305,12 +312,11 @@ fn good_commit_leads_to_relay() { }, _ => panic!("commit expected"), } - }) - .map_err(|_| panic!("could not process commit")); + }); // once the message is sent and commit is "handled" we should have // a repropagation event coming from the network. - send_message.join(handle_commit).and_then(move |(tester, ())| { + future::join(send_message, handle_commit).then(move |(tester, ())| { tester.filter_network_events(move |event| match event { Event::WriteNotification(_, data) => { data == encoded_commit @@ -318,11 +324,10 @@ fn good_commit_leads_to_relay() { _ => false, }) }) - .map_err(|_| panic!("could not watch for gossip message")) .map(|_| ()) }); - current_thread::Runtime::new().unwrap().block_on(test).unwrap(); + futures::executor::block_on(test); } #[test] @@ -371,14 +376,14 @@ fn bad_commit_leads_to_report() { let id = sc_network::PeerId::random(); let global_topic = super::global_topic::<Block>(set_id); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let test = make_test_network(&threads_pool).0 - .and_then(move |tester| { + .map(move |tester| { // register a peer. tester.gossip_validator.new_peer(&mut NoopContext, &id, sc_network::config::Roles::FULL); - Ok((tester, id)) + (tester, id) }) - .and_then(move |(tester, id)| { + .then(move |(tester, id)| { // start round, dispatch commit, and wait for broadcast. let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false); @@ -422,12 +427,11 @@ fn bad_commit_leads_to_report() { }, _ => panic!("commit expected"), } - }) - .map_err(|_| panic!("could not process commit")); + }); // once the message is sent and commit is "handled" we should have // a report event coming from the network. - send_message.join(handle_commit).and_then(move |(tester, ())| { + future::join(send_message, handle_commit).then(move |(tester, ())| { tester.filter_network_events(move |event| match event { Event::Report(who, cost_benefit) => { who == id && cost_benefit == super::cost::INVALID_COMMIT @@ -435,26 +439,25 @@ fn bad_commit_leads_to_report() { _ => false, }) }) - .map_err(|_| panic!("could not watch for peer report")) .map(|_| ()) }); - current_thread::Runtime::new().unwrap().block_on(test).unwrap(); + futures::executor::block_on(test); } #[test] fn peer_with_higher_view_leads_to_catch_up_request() { let id = sc_network::PeerId::random(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let (tester, mut net) = make_test_network(&threads_pool); let test = tester - .and_then(move |tester| { + .map(move |tester| { // register a peer with authority role. tester.gossip_validator.new_peer(&mut NoopContext, &id, sc_network::config::Roles::AUTHORITY); - Ok((tester, id)) + ((tester, id)) }) - .and_then(move |(tester, id)| { + .then(move |(tester, id)| { // send neighbor message at round 10 and height 50 let result = tester.gossip_validator.validate( &mut net, @@ -494,9 +497,8 @@ fn peer_with_higher_view_leads_to_catch_up_request() { }, _ => false, }) - .map_err(|_| panic!("could not watch for peer send message")) .map(|_| ()) }); - current_thread::Runtime::new().unwrap().block_on(test).unwrap(); + futures::executor::block_on(test); } diff --git a/substrate/client/finality-grandpa/src/environment.rs b/substrate/client/finality-grandpa/src/environment.rs index c5c6291dc0bc6911a5fbf0e30dbfafb8025ba151..fd88113776c9c02c9feca3292e096e3958d50693 100644 --- a/substrate/client/finality-grandpa/src/environment.rs +++ b/substrate/client/finality-grandpa/src/environment.rs @@ -16,17 +16,13 @@ use std::collections::BTreeMap; use std::iter::FromIterator; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use log::{debug, warn, info}; use parity_scale_codec::{Decode, Encode}; use futures::prelude::*; -use futures03::{ - compat::{Compat, CompatSink}, - future::{FutureExt as _, TryFutureExt as _}, - stream::StreamExt as _, -}; use futures_timer::Delay; use parking_lot::RwLock; use sp_blockchain::{HeaderBackend, Error as ClientError}; @@ -568,19 +564,18 @@ where NumberFor<Block>: BlockNumberOps, Client<B, E, Block, RA>: AuxStore, { - type Timer = Box<dyn Future<Item = (), Error = Self::Error> + Send>; + type Timer = Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>; type Id = AuthorityId; type Signature = AuthoritySignature; // regular round message streams - type In = Box<dyn Stream< - Item = ::finality_grandpa::SignedMessage<Block::Hash, NumberFor<Block>, Self::Signature, Self::Id>, + type In = Pin<Box<dyn Stream< + Item = Result<::finality_grandpa::SignedMessage<Block::Hash, NumberFor<Block>, Self::Signature, Self::Id>, Self::Error> + > + Send>>; + type Out = Pin<Box<dyn Sink< + ::finality_grandpa::Message<Block::Hash, NumberFor<Block>>, Error = Self::Error, - > + Send>; - type Out = Box<dyn Sink< - SinkItem = ::finality_grandpa::Message<Block::Hash, NumberFor<Block>>, - SinkError = Self::Error, - > + Send>; + > + Send>>; type Error = CommandOrError<Block::Hash, NumberFor<Block>>; @@ -612,12 +607,9 @@ where has_voted, ); - let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item))); - let outgoing = CompatSink::new(outgoing); - // schedule incoming messages from the network to be held until // corresponding blocks are imported. - let incoming = Box::new(UntilVoteTargetImported::new( + let incoming = Box::pin(UntilVoteTargetImported::new( self.client.import_notification_stream(), self.network.clone(), self.client.clone(), @@ -626,12 +618,12 @@ where ).map_err(Into::into)); // schedule network message cleanup when sink drops. - let outgoing = Box::new(outgoing.sink_map_err(Into::into)); + let outgoing = Box::pin(outgoing.sink_err_into()); voter::RoundData { voter_id: local_key.map(|pair| pair.public()), - prevote_timer: Box::new(prevote_timer.map(Ok).compat()), - precommit_timer: Box::new(precommit_timer.map(Ok).compat()), + prevote_timer: Box::pin(prevote_timer.map(Ok)), + precommit_timer: Box::pin(precommit_timer.map(Ok)), incoming, outgoing, } @@ -905,7 +897,7 @@ where //random between 0-1 seconds. let delay: u64 = thread_rng().gen_range(0, 1000); - Box::new(Delay::new(Duration::from_millis(delay)).map(Ok).compat()) + Box::pin(Delay::new(Duration::from_millis(delay)).map(Ok)) } fn prevote_equivocation( diff --git a/substrate/client/finality-grandpa/src/import.rs b/substrate/client/finality-grandpa/src/import.rs index ad1b2b1a87fb6ad1b2995116baffdf9a0befb142..64fc62bfe7a8bd4d46c94fd2c0904e5b862ad989 100644 --- a/substrate/client/finality-grandpa/src/import.rs +++ b/substrate/client/finality-grandpa/src/import.rs @@ -18,7 +18,7 @@ use std::{sync::Arc, collections::HashMap}; use log::{debug, trace, info}; use parity_scale_codec::Encode; -use futures::sync::mpsc; +use futures::channel::mpsc; use parking_lot::RwLockWriteGuard; use sp_blockchain::{HeaderBackend, BlockStatus, well_known_cache_keys}; diff --git a/substrate/client/finality-grandpa/src/lib.rs b/substrate/client/finality-grandpa/src/lib.rs index 071214961f9a14917b1d0e8e421da3a6ec843baa..a86a97c0da7cc4a8376f23eb3b62ec318dc43360 100644 --- a/substrate/client/finality-grandpa/src/lib.rs +++ b/substrate/client/finality-grandpa/src/lib.rs @@ -53,9 +53,9 @@ //! included in the newly-finalized chain. use futures::prelude::*; -use futures03::{StreamExt, future::ready}; -use log::{debug, error, info}; -use futures::sync::mpsc; +use futures::StreamExt; +use log::{debug, info}; +use futures::channel::mpsc; use sc_client_api::{BlockchainEvents, CallExecutor, backend::{AuxStore, Backend}, ExecutionStrategy}; use sp_blockchain::{HeaderBackend, Error as ClientError}; use sc_client::Client; @@ -66,7 +66,7 @@ use sc_keystore::KeyStorePtr; use sp_inherents::InherentDataProviders; use sp_consensus::SelectChain; use sp_core::Pair; -use sc_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG, CONSENSUS_WARN}; +use sc_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG}; use serde_json; use sp_finality_tracker; @@ -77,6 +77,8 @@ use finality_grandpa::{voter, BlockNumberOps, voter_set::VoterSet}; use std::{fmt, io}; use std::sync::Arc; use std::time::Duration; +use std::pin::Pin; +use std::task::{Poll, Context}; mod authorities; mod aux_schema; @@ -456,13 +458,12 @@ fn global_communication<Block: BlockT, B, E, N, RA>( keystore: &Option<KeyStorePtr>, ) -> ( impl Stream< - Item = CommunicationInH<Block, Block::Hash>, - Error = CommandOrError<Block::Hash, NumberFor<Block>>, + Item = Result<CommunicationInH<Block, Block::Hash>, CommandOrError<Block::Hash, NumberFor<Block>>>, >, impl Sink< - SinkItem = CommunicationOutH<Block, Block::Hash>, - SinkError = CommandOrError<Block::Hash, NumberFor<Block>>, - >, + CommunicationOutH<Block, Block::Hash>, + Error = CommandOrError<Block::Hash, NumberFor<Block>>, + > + Unpin, ) where B: Backend<Block>, E: CallExecutor<Block> + Send + Sync, @@ -536,7 +537,7 @@ pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X, Sp> { /// Handle to a future that will resolve on exit. pub on_exit: X, /// If supplied, can be used to hook on telemetry connection established events. - pub telemetry_on_connect: Option<futures03::channel::mpsc::UnboundedReceiver<()>>, + pub telemetry_on_connect: Option<futures::channel::mpsc::UnboundedReceiver<()>>, /// A voting rule used to potentially restrict target votes. pub voting_rule: VR, /// How to spawn background tasks. @@ -547,7 +548,7 @@ pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X, Sp> { /// block import worker that has already been instantiated with `block_import`. pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>( grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>, -) -> sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where +) -> sp_blockchain::Result<impl Future<Output = ()> + Unpin + Send + 'static> where Block::Hash: Ord, B: Backend<Block> + 'static, E: CallExecutor<Block> + Send + Sync + 'static, @@ -557,9 +558,9 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>( NumberFor<Block>: BlockNumberOps, DigestFor<Block>: Encode, RA: Send + Sync + 'static, - X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static, + X: futures::Future<Output=()> + Clone + Send + Unpin + 'static, Client<B, E, Block, RA>: AuxStore, - Sp: futures03::task::Spawn + 'static, + Sp: futures::task::Spawn + 'static, { let GrandpaParams { config, @@ -608,13 +609,11 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>( .expect("authorities is always at least an empty vector; elements are always of type string") } ); - ready(()) - }) - .unit_error() - .compat(); - futures::future::Either::A(events) + future::ready(()) + }); + future::Either::Left(events) } else { - futures::future::Either::B(futures::future::empty()) + future::Either::Right(future::pending()) }; let voter_work = VoterWork::new( @@ -628,28 +627,22 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>( ); let voter_work = voter_work - .map(|_| ()) - .map_err(|e| { - error!("GRANDPA Voter failed: {:?}", e); - telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e); - }); + .map(|_| ()); // Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa. let telemetry_task = telemetry_task - .then(|_| futures::future::empty::<(), ()>()); + .then(|_| future::pending::<()>()); - use futures03::{FutureExt, TryFutureExt}; - - Ok(voter_work.select(on_exit.map(Ok).compat()).select2(telemetry_task).then(|_| Ok(()))) + Ok(future::select(future::select(voter_work, on_exit), telemetry_task).map(drop)) } /// Future that powers the voter. #[must_use] struct VoterWork<B, E, Block: BlockT, N: NetworkT<Block>, RA, SC, VR> { - voter: Box<dyn Future<Item = (), Error = CommandOrError<Block::Hash, NumberFor<Block>>> + Send>, + voter: Pin<Box<dyn Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> + Send>>, env: Arc<Environment<B, E, Block, N, RA, SC, VR>>, voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>, - network: futures03::compat::Compat<NetworkBridge<Block, N>>, + network: NetworkBridge<Block, N>, } impl<B, E, Block, N, RA, SC, VR> VoterWork<B, E, Block, N, RA, SC, VR> @@ -691,10 +684,10 @@ where let mut work = VoterWork { // `voter` is set to a temporary value and replaced below when // calling `rebuild_voter`. - voter: Box::new(futures::empty()) as Box<_>, + voter: Box::pin(future::pending()), env, voter_commands_rx, - network: futures03::future::TryFutureExt::compat(network), + network, }; work.rebuild_voter(); work @@ -757,10 +750,10 @@ where last_finalized, ); - self.voter = Box::new(voter); + self.voter = Box::pin(voter); }, VoterSetState::Paused { .. } => - self.voter = Box::new(futures::empty()), + self.voter = Box::pin(future::pending()), }; } @@ -841,61 +834,47 @@ where VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static, Client<B, E, Block, RA>: AuxStore, { - type Item = (); - type Error = Error; + type Output = Result<(), Error>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - match self.voter.poll() { - Ok(Async::NotReady) => {} - Ok(Async::Ready(())) => { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + match Future::poll(Pin::new(&mut self.voter), cx) { + Poll::Pending => {} + Poll::Ready(Ok(())) => { // voters don't conclude naturally - return Err(Error::Safety("GRANDPA voter has concluded.".into())) + return Poll::Ready(Err(Error::Safety("GRANDPA voter has concluded.".into()))) } - Err(CommandOrError::Error(e)) => { + Poll::Ready(Err(CommandOrError::Error(e))) => { // return inner observer error - return Err(e) + return Poll::Ready(Err(e)) } - Err(CommandOrError::VoterCommand(command)) => { + Poll::Ready(Err(CommandOrError::VoterCommand(command))) => { // some command issued internally self.handle_voter_command(command)?; - futures::task::current().notify(); + cx.waker().wake_by_ref(); } } - match self.voter_commands_rx.poll() { - Ok(Async::NotReady) => {} - Err(_) => { - // the `voter_commands_rx` stream should not fail. - return Ok(Async::Ready(())) - } - Ok(Async::Ready(None)) => { + match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) { + Poll::Pending => {} + Poll::Ready(None) => { // the `voter_commands_rx` stream should never conclude since it's never closed. - return Ok(Async::Ready(())) + return Poll::Ready(Ok(())) } - Ok(Async::Ready(Some(command))) => { + Poll::Ready(Some(command)) => { // some command issued externally self.handle_voter_command(command)?; - futures::task::current().notify(); + cx.waker().wake_by_ref(); } } - match self.network.poll() { - Ok(Async::NotReady) => {}, - Ok(Async::Ready(())) => { - // the network bridge future should never conclude. - return Ok(Async::Ready(())) - } - e @ Err(_) => return e, - }; - - Ok(Async::NotReady) + Future::poll(Pin::new(&mut self.network), cx) } } #[deprecated(since = "1.1.0", note = "Please switch to run_grandpa_voter.")] pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>( grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>, -) -> sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where +) -> sp_blockchain::Result<impl Future<Output=()> + Send + 'static> where Block::Hash: Ord, B: Backend<Block> + 'static, E: CallExecutor<Block> + Send + Sync + 'static, @@ -905,9 +884,9 @@ pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>( DigestFor<Block>: Encode, RA: Send + Sync + 'static, VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static, - X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static, + X: futures::Future<Output=()> + Clone + Send + Unpin + 'static, Client<B, E, Block, RA>: AuxStore, - Sp: futures03::task::Spawn + 'static, + Sp: futures::task::Spawn + 'static, { run_grandpa_voter(grandpa_params) } diff --git a/substrate/client/finality-grandpa/src/observer.rs b/substrate/client/finality-grandpa/src/observer.rs index 989a1e1655e8d1109724aa0fb013864bdeec5a99..418bd570c0b86e7bc54c6da716cee24f309cebdd 100644 --- a/substrate/client/finality-grandpa/src/observer.rs +++ b/substrate/client/finality-grandpa/src/observer.rs @@ -14,10 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; -use futures::prelude::*; -use futures::{future, sync::mpsc}; +use futures::{prelude::*, channel::mpsc}; use finality_grandpa::{ BlockNumberOps, Error as GrandpaError, voter, voter_set::VoterSet @@ -64,14 +65,13 @@ fn grandpa_observer<B, E, Block: BlockT, RA, S, F>( last_finalized_number: NumberFor<Block>, commits: S, note_round: F, -) -> impl Future<Item=(), Error=CommandOrError<Block::Hash, NumberFor<Block>>> where +) -> impl Future<Output=Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> where NumberFor<Block>: BlockNumberOps, B: Backend<Block>, E: CallExecutor<Block> + Send + Sync + 'static, RA: Send + Sync, S: Stream< - Item = CommunicationIn<Block>, - Error = CommandOrError<Block::Hash, NumberFor<Block>>, + Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>, >, F: Fn(u64), { @@ -80,7 +80,7 @@ fn grandpa_observer<B, E, Block: BlockT, RA, S, F>( let client = client.clone(); let voters = voters.clone(); - let observer = commits.fold(last_finalized_number, move |last_finalized_number, global| { + let observer = commits.try_fold(last_finalized_number, move |last_finalized_number, global| { let (round, commit, callback) = match global { voter::CommunicationIn::Commit(round, commit, callback) => { let commit = finality_grandpa::Commit::from(commit); @@ -143,7 +143,7 @@ fn grandpa_observer<B, E, Block: BlockT, RA, S, F>( } }); - observer.map(|_| ()) + observer.map_ok(|_| ()) } /// Run a GRANDPA observer as a task, the observer will finalize blocks only by @@ -154,16 +154,16 @@ pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC, Sp>( config: Config, link: LinkHalf<B, E, Block, RA, SC>, network: N, - on_exit: impl futures03::Future<Output=()> + Clone + Send + Unpin + 'static, + on_exit: impl futures::Future<Output=()> + Clone + Send + Unpin + 'static, executor: Sp, -) -> sp_blockchain::Result<impl Future<Item=(), Error=()> + Send + 'static> where +) -> sp_blockchain::Result<impl Future<Output = ()> + Unpin + Send + 'static> where B: Backend<Block> + 'static, E: CallExecutor<Block> + Send + Sync + 'static, N: NetworkT<Block> + Send + Clone + 'static, SC: SelectChain<Block> + 'static, NumberFor<Block>: BlockNumberOps, RA: Send + Sync + 'static, - Sp: futures03::task::Spawn + 'static, + Sp: futures::task::Spawn + 'static, Client<B, E, Block, RA>: AuxStore, { let LinkHalf { @@ -189,20 +189,18 @@ pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC, Sp>( ); let observer_work = observer_work - .map(|_| ()) + .map_ok(|_| ()) .map_err(|e| { warn!("GRANDPA Observer failed: {:?}", e); }); - use futures03::{FutureExt, TryFutureExt}; - - Ok(observer_work.select(on_exit.map(Ok).compat()).map(|_| ()).map_err(|_| ())) + Ok(future::select(observer_work, on_exit).map(drop)) } /// Future that powers the observer. #[must_use] struct ObserverWork<B: BlockT, N: NetworkT<B>, E, Backend, RA> { - observer: Box<dyn Future<Item = (), Error = CommandOrError<B::Hash, NumberFor<B>>> + Send>, + observer: Pin<Box<dyn Future<Output = Result<(), CommandOrError<B::Hash, NumberFor<B>>>> + Send>>, client: Arc<Client<Backend, E, B, RA>>, network: NetworkBridge<B, N>, persistent_data: PersistentData<B>, @@ -231,7 +229,7 @@ where let mut work = ObserverWork { // `observer` is set to a temporary value and replaced below when // calling `rebuild_observer`. - observer: Box::new(futures::empty()) as Box<_>, + observer: Box::pin(future::pending()) as Pin<Box<_>>, client, network, persistent_data, @@ -286,7 +284,7 @@ where note_round, ); - self.observer = Box::new(observer); + self.observer = Box::pin(observer); } fn handle_voter_command( @@ -336,44 +334,41 @@ where Bk: Backend<B> + 'static, Client<Bk, E, B, RA>: AuxStore, { - type Item = (); - type Error = Error; + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let this = Pin::into_inner(self); - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - match self.observer.poll() { - Ok(Async::NotReady) => {} - Ok(Async::Ready(())) => { + match Future::poll(Pin::new(&mut this.observer), cx) { + Poll::Pending => {} + Poll::Ready(Ok(())) => { // observer commit stream doesn't conclude naturally; this could reasonably be an error. - return Ok(Async::Ready(())) + return Poll::Ready(Ok(())) } - Err(CommandOrError::Error(e)) => { + Poll::Ready(Err(CommandOrError::Error(e))) => { // return inner observer error - return Err(e) + return Poll::Ready(Err(e)) } - Err(CommandOrError::VoterCommand(command)) => { + Poll::Ready(Err(CommandOrError::VoterCommand(command))) => { // some command issued internally - self.handle_voter_command(command)?; - futures::task::current().notify(); + this.handle_voter_command(command)?; + cx.waker().wake_by_ref(); } } - match self.voter_commands_rx.poll() { - Ok(Async::NotReady) => {} - Err(_) => { - // the `voter_commands_rx` stream should not fail. - return Ok(Async::Ready(())) - } - Ok(Async::Ready(None)) => { + match Stream::poll_next(Pin::new(&mut this.voter_commands_rx), cx) { + Poll::Pending => {} + Poll::Ready(None) => { // the `voter_commands_rx` stream should never conclude since it's never closed. - return Ok(Async::Ready(())) + return Poll::Ready(Ok(())) } - Ok(Async::Ready(Some(command))) => { + Poll::Ready(Some(command)) => { // some command issued externally - self.handle_voter_command(command)?; - futures::task::current().notify(); + this.handle_voter_command(command)?; + cx.waker().wake_by_ref(); } } - Ok(Async::NotReady) + Poll::Pending } } diff --git a/substrate/client/finality-grandpa/src/tests.rs b/substrate/client/finality-grandpa/src/tests.rs index fdfb2fd808ec5d82aa97a28f104945abba69ffa6..0d7cf0541f355c1255d26684530f63e51c6fc85e 100644 --- a/substrate/client/finality-grandpa/src/tests.rs +++ b/substrate/client/finality-grandpa/src/tests.rs @@ -25,7 +25,6 @@ use sc_network_test::{ use sc_network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder}; use parking_lot::Mutex; use futures_timer::Delay; -use futures03::TryStreamExt as _; use tokio::runtime::current_thread; use sp_keyring::Ed25519Keyring; use sc_client::LongestChain; @@ -48,6 +47,8 @@ use sp_runtime::generic::{BlockId, DigestItem}; use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public}; use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi}; use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check}; +use futures01::Async; +use futures::compat::Future01CompatExt; use authorities::AuthoritySet; use finality_proof::{ @@ -196,7 +197,7 @@ impl TestNetFactory for GrandpaTestNet { #[derive(Clone)] struct Exit; -impl futures03::Future for Exit { +impl futures::Future for Exit { type Output = (); fn poll(self: Pin<&mut Self>, _: &mut task::Context) -> task::Poll<()> { @@ -371,17 +372,28 @@ fn create_keystore(authority: Ed25519Keyring) -> (KeyStorePtr, tempfile::TempDir (keystore, keystore_path) } +fn block_until_complete(future: impl Future + Unpin, net: &Arc<Mutex<GrandpaTestNet>>, runtime: &mut current_thread::Runtime) { + let drive_to_completion = futures01::future::poll_fn(|| { + net.lock().poll(); Ok::<Async<()>, ()>(Async::NotReady) + }); + runtime.block_on( + future::select(future, drive_to_completion.compat()) + .map(|_| Ok::<(), ()>(())) + .compat() + ).unwrap(); +} + // run the voters to completion. provide a closure to be invoked after // the voters are spawned but before blocking on them. fn run_to_completion_with<F>( runtime: &mut current_thread::Runtime, - threads_pool: &futures03::executor::ThreadPool, + threads_pool: &futures::executor::ThreadPool, blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[Ed25519Keyring], with: F, ) -> u64 where - F: FnOnce(current_thread::Handle) -> Option<Box<dyn Future<Item=(), Error=()>>> + F: FnOnce(current_thread::Handle) -> Option<Pin<Box<dyn Future<Output = ()>>>> { use parking_lot::RwLock; @@ -411,17 +423,16 @@ fn run_to_completion_with<F>( }; wait_for.push( - Box::new( + Box::pin( client.finality_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() .take_while(move |n| { let mut highest_finalized = highest_finalized.write(); if *n.header.number() > *highest_finalized { *highest_finalized = *n.header.number(); } - Ok(n.header.number() < &blocks) + future::ready(n.header.number() < &blocks) }) - .collect() + .collect::<Vec<_>>() .map(|_| ()) ) ); @@ -449,24 +460,20 @@ fn run_to_completion_with<F>( assert_send(&voter); - runtime.spawn(voter); + runtime.spawn(voter.unit_error().compat()); } // wait for all finalized on each. - let wait_for = ::futures::future::join_all(wait_for) - .map(|_| ()) - .map_err(|_| ()); - - let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); - let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); + let wait_for = ::futures::future::join_all(wait_for); + block_until_complete(wait_for, &net, runtime); let highest_finalized = *highest_finalized.read(); highest_finalized } fn run_to_completion( runtime: &mut current_thread::Runtime, - threads_pool: &futures03::executor::ThreadPool, + threads_pool: &futures::executor::ThreadPool, blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[Ed25519Keyring] @@ -496,7 +503,7 @@ fn add_forced_change( fn finalize_3_voters_no_observers() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let voters = make_ids(peers); @@ -522,7 +529,7 @@ fn finalize_3_voters_no_observers() { #[test] fn finalize_3_voters_1_full_observer() { let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let voters = make_ids(peers); @@ -555,9 +562,8 @@ fn finalize_3_voters_1_full_observer() { }; finality_notifications.push( client.finality_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() - .take_while(|n| Ok(n.header.number() < &20)) - .for_each(move |_| Ok(())) + .take_while(|n| future::ready(n.header.number() < &20)) + .for_each(move |_| future::ready(())) ); let keystore = if let Some(local_key) = local_key { @@ -590,16 +596,14 @@ fn finalize_3_voters_1_full_observer() { } for voter in voters { - runtime.spawn(voter); + runtime.spawn(voter.unit_error().compat()); } // wait for all finalized on each. let wait_for = futures::future::join_all(finality_notifications) - .map(|_| ()) - .map_err(|_| ()); + .map(|_| ()); - let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); - let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); + block_until_complete(wait_for, &net, &mut runtime); } #[test] @@ -631,7 +635,7 @@ fn transition_3_voters_twice_1_full_observer() { let net = Arc::new(Mutex::new(GrandpaTestNet::new(api, 8))); let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); net.lock().peer(0).push_blocks(1, false); net.lock().block_until_sync(&mut runtime); @@ -654,8 +658,7 @@ fn transition_3_voters_twice_1_full_observer() { // wait for blocks to be finalized before generating new ones let block_production = client.finality_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() - .take_while(|n| Ok(n.header.number() < &30)) + .take_while(|n| future::ready(n.header.number() < &30)) .for_each(move |n| { match n.header.number() { 1 => { @@ -692,10 +695,10 @@ fn transition_3_voters_twice_1_full_observer() { _ => {}, } - Ok(()) + future::ready(()) }); - runtime.spawn(block_production); + runtime.spawn(block_production.unit_error().compat()); } let mut finality_notifications = Vec::new(); @@ -725,9 +728,8 @@ fn transition_3_voters_twice_1_full_observer() { finality_notifications.push( client.finality_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() - .take_while(|n| Ok(n.header.number() < &30)) - .for_each(move |_| Ok(())) + .take_while(|n| future::ready(n.header.number() < &30)) + .for_each(move |_| future::ready(())) .map(move |()| { let full_client = client.as_full().expect("only full clients are used in test"); let set: AuthoritySet<Hash, BlockNumber> = crate::aux_schema::load_authorities(&*full_client).unwrap(); @@ -756,22 +758,19 @@ fn transition_3_voters_twice_1_full_observer() { }; let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); - runtime.spawn(voter); + runtime.spawn(voter.unit_error().compat()); } // wait for all finalized on each. - let wait_for = ::futures::future::join_all(finality_notifications) - .map(|_| ()) - .map_err(|_| ()); + let wait_for = ::futures::future::join_all(finality_notifications); - let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); - let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); + block_until_complete(wait_for, &net, &mut runtime); } #[test] fn justification_is_emitted_when_consensus_data_changes() { let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 3); @@ -790,7 +789,7 @@ fn justification_is_emitted_when_consensus_data_changes() { #[test] fn justification_is_generated_periodically() { let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let voters = make_ids(peers); @@ -830,7 +829,7 @@ fn consensus_changes_works() { #[test] fn sync_justifications_on_change_blocks() { let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let peers_a = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let peers_b = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob]; let voters = make_ids(peers_b); @@ -871,21 +870,21 @@ fn sync_justifications_on_change_blocks() { } // the last peer should get the justification by syncing from other peers - runtime.block_on(futures::future::poll_fn(move || -> std::result::Result<_, ()> { + futures::executor::block_on(futures::future::poll_fn(move |_| { if net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() { net.lock().poll(); - Ok(Async::NotReady) + Poll::Pending } else { - Ok(Async::Ready(())) + Poll::Ready(()) } - })).unwrap() + })) } #[test] fn finalizes_multiple_pending_changes_in_order() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let peers_a = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let peers_b = &[Ed25519Keyring::Dave, Ed25519Keyring::Eve, Ed25519Keyring::Ferdie]; @@ -946,7 +945,7 @@ fn finalizes_multiple_pending_changes_in_order() { fn force_change_to_new_set() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); // two of these guys are offline. let genesis_authorities = &[ Ed25519Keyring::Alice, @@ -1123,11 +1122,11 @@ fn voter_persists_its_votes() { use std::iter::FromIterator; use std::sync::atomic::{AtomicUsize, Ordering}; use futures::future; - use futures::sync::mpsc; + use futures::channel::mpsc; let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); // we have two authorities but we'll only be running the voter for alice // we are going to be listening for the prevotes it casts @@ -1161,56 +1160,56 @@ fn voter_persists_its_votes() { keystore_paths.push(keystore_path); struct ResettableVoter { - voter: Box<dyn Future<Item = (), Error = ()> + Send>, + voter: Pin<Box<dyn Future<Output = ()> + Send + Unpin>>, voter_rx: mpsc::UnboundedReceiver<()>, net: Arc<Mutex<GrandpaTestNet>>, client: PeersClient, keystore: KeyStorePtr, - threads_pool: futures03::executor::ThreadPool, + threads_pool: futures::executor::ThreadPool, } impl Future for ResettableVoter { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - match self.voter.poll() { - Ok(Async::Ready(())) | Err(_) => panic!("error in the voter"), - Ok(Async::NotReady) => {}, + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let this = Pin::into_inner(self); + + if let Poll::Ready(()) = Pin::new(&mut this.voter).poll(cx) { + panic!("error in the voter"); } - match self.voter_rx.poll() { - Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => {} - Ok(Async::Ready(Some(()))) => { + match Pin::new(&mut this.voter_rx).poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(())) => { let (_block_import, _, _, _, link) = - self.net.lock() + this.net.lock() .make_block_import::< TransactionFor<substrate_test_runtime_client::Backend, Block> - >(self.client.clone()); + >(this.client.clone()); let link = link.lock().take().unwrap(); let grandpa_params = GrandpaParams { config: Config { gossip_duration: TEST_GOSSIP_DURATION, justification_period: 32, - keystore: Some(self.keystore.clone()), + keystore: Some(this.keystore.clone()), name: Some(format!("peer#{}", 0)), is_authority: true, observer_enabled: true, }, link, - network: self.net.lock().peers[0].network_service().clone(), + network: this.net.lock().peers[0].network_service().clone(), inherent_data_providers: InherentDataProviders::new(), on_exit: Exit, telemetry_on_connect: None, voting_rule: VotingRulesBuilder::default().build(), - executor: self.threads_pool.clone(), + executor: this.threads_pool.clone(), }; let voter = run_grandpa_voter(grandpa_params) .expect("all in order with client and network") - .then(move |r| { + .map(move |r| { // we need to keep the block_import alive since it owns the // sender for the voter commands channel, if that gets dropped // then the voter will stop @@ -1218,30 +1217,30 @@ fn voter_persists_its_votes() { r }); - self.voter = Box::new(voter); + this.voter = Box::pin(voter); // notify current task in order to poll the voter - futures::task::current().notify(); + cx.waker().wake_by_ref(); } }; - Ok(Async::NotReady) + Poll::Pending } } - // we create a "dummy" voter by setting it to `empty` and triggering the `tx`. + // we create a "dummy" voter by setting it to `pending` and triggering the `tx`. // this way, the `ResettableVoter` will reset its `voter` field to a value ASAP. voter_tx.unbounded_send(()).unwrap(); runtime.spawn(ResettableVoter { - voter: Box::new(futures::future::empty()), + voter: Box::pin(futures::future::pending()), voter_rx, net: net.clone(), client: client.clone(), keystore, threads_pool: threads_pool.clone(), - }); + }.unit_error().compat()); } - let (exit_tx, exit_rx) = futures::sync::oneshot::channel::<()>(); + let (exit_tx, exit_rx) = futures::channel::oneshot::channel::<()>(); // create the communication layer for bob, but don't start any // voter. instead we'll listen for the prevote that alice casts @@ -1284,122 +1283,107 @@ fn voter_persists_its_votes() { HasVoted::No, ); - let round_rx = futures03::compat::Compat::new(round_rx.map(|item| Ok::<_, Error>(item))); - let round_tx = futures03::compat::CompatSink::new(round_tx); - let round_tx = Arc::new(Mutex::new(round_tx)); let exit_tx = Arc::new(Mutex::new(Some(exit_tx))); let net = net.clone(); - let state = AtomicUsize::new(0); + let state = Arc::new(AtomicUsize::new(0)); runtime.spawn(round_rx.for_each(move |signed| { - if state.compare_and_swap(0, 1, Ordering::SeqCst) == 0 { - // the first message we receive should be a prevote from alice. - let prevote = match signed.message { - finality_grandpa::Message::Prevote(prevote) => prevote, - _ => panic!("voter should prevote."), - }; - - // its chain has 20 blocks and the voter targets 3/4 of the - // unfinalized chain, so the vote should be for block 15 - assert!(prevote.target_number == 15); - - // we push 20 more blocks to alice's chain - net.lock().peer(0).push_blocks(20, false); - - let net2 = net.clone(); - let net = net.clone(); - let voter_tx = voter_tx.clone(); - let round_tx = round_tx.clone(); - - let interval = futures03::stream::unfold(Delay::new(Duration::from_millis(200)), |delay| - Box::pin(async move { - delay.await; - Some(((), Delay::new(Duration::from_millis(200)))) - })).map(Ok::<_, ()>).compat(); - - future::Either::A(interval - .take_while(move |_| { - Ok(net2.lock().peer(1).client().info().best_number != 40) - }) - .for_each(|_| Ok(())) - .and_then(move |_| { - let block_30_hash = - net.lock().peer(0).client().as_full().unwrap().hash(30).unwrap().unwrap(); - - // we restart alice's voter - voter_tx.unbounded_send(()).unwrap(); - - // and we push our own prevote for block 30 - let prevote = finality_grandpa::Prevote { - target_number: 30, - target_hash: block_30_hash, - }; - - // One should either be calling `Sink::send` or `Sink::start_send` followed - // by `Sink::poll_complete` to make sure items are being flushed. Given that - // we send in a loop including a delay until items are received, this can be - // ignored for the sake of reduced complexity. - if !round_tx.lock() - .start_send(finality_grandpa::Message::Prevote(prevote)) - .unwrap() - .is_ready() { - panic!("expected sink to be ready to write to."); - } - - Ok(()) - }).map_err(|_| panic!())) - - } else if state.compare_and_swap(1, 2, Ordering::SeqCst) == 1 { - // the next message we receive should be our own prevote - let prevote = match signed.message { - finality_grandpa::Message::Prevote(prevote) => prevote, - _ => panic!("We should receive our own prevote."), - }; - - // targeting block 30 - assert!(prevote.target_number == 30); - - // after alice restarts it should send its previous prevote - // therefore we won't ever receive it again since it will be a - // known message on the gossip layer - - future::Either::B(future::ok(())) - - } else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 { - // we then receive a precommit from alice for block 15 - // even though we casted a prevote for block 30 - let precommit = match signed.message { - finality_grandpa::Message::Precommit(precommit) => precommit, - _ => panic!("voter should precommit."), - }; - - assert!(precommit.target_number == 15); - - // signal exit - exit_tx.clone().lock().take().unwrap().send(()).unwrap(); - - future::Either::B(future::ok(())) - - } else { - panic!() + let net2 = net.clone(); + let net = net.clone(); + let voter_tx = voter_tx.clone(); + let round_tx = round_tx.clone(); + let state = state.clone(); + let exit_tx = exit_tx.clone(); + + async move { + if state.compare_and_swap(0, 1, Ordering::SeqCst) == 0 { + // the first message we receive should be a prevote from alice. + let prevote = match signed.message { + finality_grandpa::Message::Prevote(prevote) => prevote, + _ => panic!("voter should prevote."), + }; + + // its chain has 20 blocks and the voter targets 3/4 of the + // unfinalized chain, so the vote should be for block 15 + assert!(prevote.target_number == 15); + + // we push 20 more blocks to alice's chain + net.lock().peer(0).push_blocks(20, false); + + let interval = futures::stream::unfold(Delay::new(Duration::from_millis(200)), |delay| + Box::pin(async move { + delay.await; + Some(((), Delay::new(Duration::from_millis(200)))) + }) + ); + + interval + .take_while(move |_| { + future::ready(net2.lock().peer(1).client().info().best_number != 40) + }) + .for_each(|_| future::ready(())) + .await; + + let block_30_hash = + net.lock().peer(0).client().as_full().unwrap().hash(30).unwrap().unwrap(); + + // we restart alice's voter + voter_tx.unbounded_send(()).unwrap(); + + // and we push our own prevote for block 30 + let prevote = finality_grandpa::Prevote { + target_number: 30, + target_hash: block_30_hash, + }; + + // One should either be calling `Sink::send` or `Sink::start_send` followed + // by `Sink::poll_complete` to make sure items are being flushed. Given that + // we send in a loop including a delay until items are received, this can be + // ignored for the sake of reduced complexity. + Pin::new(&mut *round_tx.lock()).start_send(finality_grandpa::Message::Prevote(prevote)).unwrap(); + } else if state.compare_and_swap(1, 2, Ordering::SeqCst) == 1 { + // the next message we receive should be our own prevote + let prevote = match signed.message { + finality_grandpa::Message::Prevote(prevote) => prevote, + _ => panic!("We should receive our own prevote."), + }; + + // targeting block 30 + assert!(prevote.target_number == 30); + + // after alice restarts it should send its previous prevote + // therefore we won't ever receive it again since it will be a + // known message on the gossip layer + + } else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 { + // we then receive a precommit from alice for block 15 + // even though we casted a prevote for block 30 + let precommit = match signed.message { + finality_grandpa::Message::Precommit(precommit) => precommit, + _ => panic!("voter should precommit."), + }; + + assert!(precommit.target_number == 15); + + // signal exit + exit_tx.clone().lock().take().unwrap().send(()).unwrap(); + } else { + panic!() + } } - - }).map_err(|_| ())); + }).map(Ok).boxed().compat()); } - let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); - let exit = exit_rx.into_future().map(|_| ()).map_err(|_| ()); - - runtime.block_on(drive_to_completion.select(exit).map(|_| ()).map_err(|_| ())).unwrap(); + block_until_complete(exit_rx.into_future(), &net, &mut runtime); } #[test] fn finalize_3_voters_1_light_observer() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let authorities = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let voters = make_ids(authorities); @@ -1416,9 +1400,8 @@ fn finalize_3_voters_1_light_observer() { let link = net.lock().peer(3).data.lock().take().expect("link initialized on startup; qed"); let finality_notifications = net.lock().peer(3).client().finality_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() - .take_while(|n| Ok(n.header.number() < &20)) - .collect(); + .take_while(|n| future::ready(n.header.number() < &20)) + .collect::<Vec<_>>(); run_to_completion_with(&mut runtime, &threads_pool, 20, net.clone(), authorities, |executor| { executor.spawn( @@ -1435,10 +1418,10 @@ fn finalize_3_voters_1_light_observer() { net.lock().peers[3].network_service().clone(), Exit, threads_pool.clone(), - ).unwrap() + ).unwrap().unit_error().compat() ).unwrap(); - Some(Box::new(finality_notifications.map(|_| ()))) + Some(Box::pin(finality_notifications.map(|_| ()))) }); } @@ -1446,7 +1429,7 @@ fn finalize_3_voters_1_light_observer() { fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() { let _ = ::env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice]; let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 1); @@ -1460,14 +1443,15 @@ fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() { net.lock().block_until_sync(&mut runtime); // check that the block#1 is finalized on light client - runtime.block_on(futures::future::poll_fn(move || -> std::result::Result<_, ()> { + let mut runtime = current_thread::Runtime::new().unwrap(); + let _ = runtime.block_on(futures::future::poll_fn(move |_| { if net.lock().peer(1).client().info().finalized_number == 1 { - Ok(Async::Ready(())) + Poll::Ready(()) } else { net.lock().poll(); - Ok(Async::NotReady) + Poll::Pending } - })).unwrap() + }).unit_error().compat()); } #[test] @@ -1477,7 +1461,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ let _ = ::env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); // two of these guys are offline. let genesis_authorities = if FORCE_CHANGE { @@ -1542,7 +1526,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ fn voter_catches_up_to_latest_round_when_behind() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob]; let voters = make_ids(peers); @@ -1554,7 +1538,7 @@ fn voter_catches_up_to_latest_round_when_behind() { let net = Arc::new(Mutex::new(net)); let mut finality_notifications = Vec::new(); - let voter = |keystore, peer_id, link, net: Arc<Mutex<GrandpaTestNet>>| -> Box<dyn Future<Item=(), Error=()> + Send> { + let voter = |keystore, peer_id, link, net: Arc<Mutex<GrandpaTestNet>>| -> Pin<Box<dyn Future<Output = ()> + Send>> { let grandpa_params = GrandpaParams { config: Config { gossip_duration: TEST_GOSSIP_DURATION, @@ -1573,7 +1557,7 @@ fn voter_catches_up_to_latest_round_when_behind() { executor: threads_pool.clone(), }; - Box::new(run_grandpa_voter(grandpa_params).expect("all in order with client and network")) + Box::pin(run_grandpa_voter(grandpa_params).expect("all in order with client and network")) }; let mut keystore_paths = Vec::new(); @@ -1591,9 +1575,8 @@ fn voter_catches_up_to_latest_round_when_behind() { finality_notifications.push( client.finality_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() - .take_while(|n| Ok(n.header.number() < &50)) - .for_each(move |_| Ok(())) + .take_while(|n| future::ready(n.header.number() < &50)) + .for_each(move |_| future::ready(())) ); let (keystore, keystore_path) = create_keystore(*key); @@ -1601,14 +1584,13 @@ fn voter_catches_up_to_latest_round_when_behind() { let voter = voter(Some(keystore), peer_id, link, net.clone()); - runtime.spawn(voter); + runtime.spawn(voter.unit_error().compat()); } // wait for them to finalize block 50. since they'll vote on 3/4 of the // unfinalized chain it will take at least 4 rounds to do it. let wait_for_finality = ::futures::future::join_all(finality_notifications) - .map(|_| ()) - .map_err(|_| ()); + .map(|_| ()); // spawn a new voter, it should be behind by at least 4 rounds and should be // able to catch up to the latest round @@ -1616,7 +1598,7 @@ fn voter_catches_up_to_latest_round_when_behind() { let net = net.clone(); let runtime = runtime.handle(); - wait_for_finality.and_then(move |_| { + wait_for_finality.then(move |_| { let peer_id = 2; let link = { let net = net.lock(); @@ -1628,20 +1610,20 @@ fn voter_catches_up_to_latest_round_when_behind() { let voter = voter(None, peer_id, link, net); - runtime.spawn(voter).unwrap(); + runtime.spawn(voter.unit_error().compat()).unwrap(); let start_time = std::time::Instant::now(); let timeout = Duration::from_secs(5 * 60); - let wait_for_catch_up = futures::future::poll_fn(move || { + let wait_for_catch_up = futures::future::poll_fn(move |_| { // The voter will start at round 1 and since everyone else is // already at a later round the only way to get to round 4 (or // later) is by issuing a catch up request. if set_state.read().last_completed_round().number >= 4 { - Ok(Async::Ready(())) + Poll::Ready(()) } else if start_time.elapsed() > timeout { panic!("Timed out while waiting for catch up to happen") } else { - Ok(Async::NotReady) + Poll::Pending } }); @@ -1649,8 +1631,14 @@ fn voter_catches_up_to_latest_round_when_behind() { }) }; - let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); - let _ = runtime.block_on(test.select(drive_to_completion).map_err(|_| ())).unwrap(); + let drive_to_completion = futures01::future::poll_fn(|| { + net.lock().poll(); Ok::<Async<()>, ()>(Async::NotReady) + }); + runtime.block_on( + future::select(test, drive_to_completion.compat()) + .map(|_| Ok::<(), ()>(())) + .compat() + ).unwrap(); } #[test] @@ -1658,7 +1646,7 @@ fn grandpa_environment_respects_voting_rules() { use finality_grandpa::Chain; use sc_network_test::TestClient; - let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice]; let voters = make_ids(peers); diff --git a/substrate/client/finality-grandpa/src/until_imported.rs b/substrate/client/finality-grandpa/src/until_imported.rs index f53b651bcf48c95808053af86f9b006016edb721..c3a52fcf56fe197602ba844e61500686820f42cf 100644 --- a/substrate/client/finality-grandpa/src/until_imported.rs +++ b/substrate/client/finality-grandpa/src/until_imported.rs @@ -33,13 +33,15 @@ use sc_client_api::{BlockImportNotification, ImportNotifications}; use futures::prelude::*; use futures::stream::Fuse; use futures_timer::Delay; -use futures03::{StreamExt as _, TryStreamExt as _}; +use futures::channel::mpsc::UnboundedReceiver; use finality_grandpa::voter; use parking_lot::Mutex; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use std::collections::{HashMap, VecDeque}; +use std::pin::Pin; use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; +use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use sp_finality_grandpa::AuthorityId; @@ -70,13 +72,15 @@ pub(crate) trait BlockUntilImported<Block: BlockT>: Sized { } /// Buffering imported messages until blocks with given hashes are imported. +#[pin_project::pin_project] pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, I, M: BlockUntilImported<Block>> { - import_notifications: Fuse<Box<dyn Stream<Item = BlockImportNotification<Block>, Error = ()> + Send>>, + import_notifications: Fuse<UnboundedReceiver<BlockImportNotification<Block>>>, block_sync_requester: BlockSyncRequester, status_check: BlockStatus, + #[pin] inner: Fuse<I>, ready: VecDeque<M::Blocked>, - check_pending: Box<dyn Stream<Item = (), Error = std::io::Error> + Send>, + check_pending: Pin<Box<dyn Stream<Item = Result<(), std::io::Error>> + Send>>, /// Mapping block hashes to their block number, the point in time it was /// first encountered (Instant) and a list of GRANDPA messages referencing /// the block hash. @@ -87,8 +91,9 @@ pub(crate) struct UntilImported<Block: BlockT, BlockStatus, BlockSyncRequester, impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockStatus, BlockSyncRequester, I, M> where Block: BlockT, BlockStatus: BlockStatusT<Block>, + BlockSyncRequester: BlockSyncRequesterT<Block>, + I: Stream<Item = M::Blocked>, M: BlockUntilImported<Block>, - I: Stream, { /// Create a new `UntilImported` wrapper. pub(crate) fn new( @@ -105,22 +110,19 @@ impl<Block, BlockStatus, BlockSyncRequester, I, M> UntilImported<Block, BlockSta // used in the event of missed import notifications const CHECK_PENDING_INTERVAL: Duration = Duration::from_secs(5); - let check_pending = futures03::stream::unfold(Delay::new(CHECK_PENDING_INTERVAL), |delay| + let check_pending = futures::stream::unfold(Delay::new(CHECK_PENDING_INTERVAL), |delay| Box::pin(async move { delay.await; - Some(((), Delay::new(CHECK_PENDING_INTERVAL))) - })).map(Ok).compat(); + Some((Ok(()), Delay::new(CHECK_PENDING_INTERVAL))) + })); UntilImported { - import_notifications: { - let stream = import_notifications.map::<_, fn(_) -> _>(|v| Ok::<_, ()>(v)).compat(); - Box::new(stream) as Box<dyn Stream<Item = _, Error = _> + Send> - }.fuse(), + import_notifications: import_notifications.fuse(), block_sync_requester, status_check, inner: stream.fuse(), ready: VecDeque::new(), - check_pending: Box::new(check_pending), + check_pending: Box::pin(check_pending), pending: HashMap::new(), identifier, } @@ -131,24 +133,27 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat Block: BlockT, BStatus: BlockStatusT<Block>, BSyncRequester: BlockSyncRequesterT<Block>, - I: Stream<Item=M::Blocked,Error=Error>, + I: Stream<Item = M::Blocked>, M: BlockUntilImported<Block>, { - type Item = M::Blocked; - type Error = Error; + type Item = Result<M::Blocked, Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { + // We are using a `this` variable in order to allow multiple simultaneous mutable borrow + // to `self`. + let mut this = self.project(); - fn poll(&mut self) -> Poll<Option<M::Blocked>, Error> { loop { - match self.inner.poll()? { - Async::Ready(None) => return Ok(Async::Ready(None)), - Async::Ready(Some(input)) => { + match Stream::poll_next(Pin::new(&mut this.inner), cx) { + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(input)) => { // new input: schedule wait of any parts which require // blocks to be known. - let ready = &mut self.ready; - let pending = &mut self.pending; + let ready = &mut this.ready; + let pending = &mut this.pending; M::schedule_wait( input, - &self.status_check, + this.status_check, |target_hash, target_number, wait| pending .entry(target_hash) .or_insert_with(|| (target_number, Instant::now(), Vec::new())) @@ -157,37 +162,36 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat |ready_item| ready.push_back(ready_item), )?; } - Async::NotReady => break, + Poll::Pending => break, } } loop { - match self.import_notifications.poll() { - Err(_) => return Err(Error::Network(format!("Failed to get new message"))), - Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), - Ok(Async::Ready(Some(notification))) => { + match Stream::poll_next(Pin::new(&mut this.import_notifications), cx) { + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(notification)) => { // new block imported. queue up all messages tied to that hash. - if let Some((_, _, messages)) = self.pending.remove(¬ification.hash) { + if let Some((_, _, messages)) = this.pending.remove(¬ification.hash) { let canon_number = notification.header.number().clone(); let ready_messages = messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); - self.ready.extend(ready_messages); + this.ready.extend(ready_messages); } } - Ok(Async::NotReady) => break, + Poll::Pending => break, } } let mut update_interval = false; - while let Async::Ready(Some(_)) = self.check_pending.poll().map_err(Error::Timer)? { + while let Poll::Ready(Some(Ok(()))) = this.check_pending.poll_next_unpin(cx) { update_interval = true; } if update_interval { let mut known_keys = Vec::new(); - for (&block_hash, &mut (block_number, ref mut last_log, ref v)) in &mut self.pending { - if let Some(number) = self.status_check.block_number(block_hash)? { + for (&block_hash, &mut (block_number, ref mut last_log, ref v)) in this.pending.iter_mut() { + if let Some(number) = this.status_check.block_number(block_hash)? { known_keys.push((block_hash, number)); } else { let next_log = *last_log + LOG_PENDING_INTERVAL; @@ -199,13 +203,13 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat Possible fork?", block_hash, v.len(), - self.identifier, + this.identifier, ); // NOTE: when sending an empty vec of peers the // underlying should make a best effort to sync the // block from any peers it knows about. - self.block_sync_requester.set_sync_fork_request( + this.block_sync_requester.set_sync_fork_request( vec![], block_hash, block_number, @@ -217,23 +221,23 @@ impl<Block, BStatus, BSyncRequester, I, M> Stream for UntilImported<Block, BStat } for (known_hash, canon_number) in known_keys { - if let Some((_, _, pending_messages)) = self.pending.remove(&known_hash) { + if let Some((_, _, pending_messages)) = this.pending.remove(&known_hash) { let ready_messages = pending_messages.into_iter() .filter_map(|m| m.wait_completed(canon_number)); - self.ready.extend(ready_messages); + this.ready.extend(ready_messages); } } } - if let Some(ready) = self.ready.pop_front() { - return Ok(Async::Ready(Some(ready))) + if let Some(ready) = this.ready.pop_front() { + return Poll::Ready(Some(Ok(ready))) } - if self.import_notifications.is_done() && self.inner.is_done() { - Ok(Async::Ready(None)) + if this.import_notifications.is_done() && this.inner.is_done() { + Poll::Ready(None) } else { - Ok(Async::NotReady) + Poll::Pending } } } @@ -308,6 +312,8 @@ pub(crate) struct BlockGlobalMessage<Block: BlockT> { target_number: NumberFor<Block>, } +impl<Block: BlockT> Unpin for BlockGlobalMessage<Block> {} + impl<Block: BlockT> BlockUntilImported<Block> for BlockGlobalMessage<Block> { type Blocked = CommunicationIn<Block>; @@ -474,13 +480,12 @@ pub(crate) type UntilGlobalMessageBlocksImported<Block, BlockStatus, BlockSyncRe mod tests { use super::*; use crate::{CatchUp, CompactCommit}; - use tokio::runtime::current_thread::Runtime; use substrate_test_runtime_client::runtime::{Block, Hash, Header}; use sp_consensus::BlockOrigin; use sc_client_api::BlockImportNotification; use futures::future::Either; use futures_timer::Delay; - use futures03::{channel::mpsc, future::FutureExt as _, future::TryFutureExt as _}; + use futures::channel::mpsc; use finality_grandpa::Precommit; #[derive(Clone)] @@ -588,13 +593,13 @@ mod tests { // enact all dependencies before importing the message enact_dependencies(&chain_state); - let (global_tx, global_rx) = futures::sync::mpsc::unbounded(); + let (global_tx, global_rx) = futures::channel::mpsc::unbounded(); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, TestBlockSyncRequester::default(), block_status, - global_rx.map_err(|_| panic!("should never error")), + global_rx, "global", ); @@ -602,8 +607,7 @@ mod tests { let work = until_imported.into_future(); - let mut runtime = Runtime::new().unwrap(); - runtime.block_on(work).map_err(|(e, _)| e).unwrap().0.unwrap() + futures::executor::block_on(work).0.unwrap().unwrap() } fn blocking_message_on_dependencies<F>( @@ -615,13 +619,13 @@ mod tests { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); - let (global_tx, global_rx) = futures::sync::mpsc::unbounded(); + let (global_tx, global_rx) = futures::channel::mpsc::unbounded(); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, TestBlockSyncRequester::default(), block_status, - global_rx.map_err(|_| panic!("should never error")), + global_rx, "global", ); @@ -630,13 +634,10 @@ mod tests { // NOTE: needs to be cloned otherwise it is moved to the stream and // dropped too early. let inner_chain_state = chain_state.clone(); - let work = until_imported - .into_future() - .select2(Delay::new(Duration::from_millis(100)).unit_error().compat()) + let work = future::select(until_imported.into_future(), Delay::new(Duration::from_millis(100))) .then(move |res| match res { - Err(_) => panic!("neither should have had error"), - Ok(Either::A(_)) => panic!("timeout should have fired first"), - Ok(Either::B((_, until_imported))) => { + Either::Left(_) => panic!("timeout should have fired first"), + Either::Right((_, until_imported)) => { // timeout fired. push in the headers. enact_dependencies(&inner_chain_state); @@ -644,8 +645,7 @@ mod tests { } }); - let mut runtime = Runtime::new().unwrap(); - runtime.block_on(work).map_err(|(e, _)| e).unwrap().0.unwrap() + futures::executor::block_on(work).0.unwrap().unwrap() } #[test] @@ -871,7 +871,7 @@ mod tests { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); - let (global_tx, global_rx) = futures::sync::mpsc::unbounded(); + let (global_tx, global_rx) = futures::channel::mpsc::unbounded(); let block_sync_requester = TestBlockSyncRequester::default(); @@ -879,7 +879,7 @@ mod tests { import_notifications, block_sync_requester.clone(), block_status, - global_rx.map_err(|_| panic!("should never error")), + global_rx, "global", ); @@ -914,31 +914,31 @@ mod tests { // we send the commit message and spawn the until_imported stream global_tx.unbounded_send(unknown_commit()).unwrap(); - let mut runtime = Runtime::new().unwrap(); - runtime.spawn(until_imported.into_future().map(|_| ()).map_err(|_| ())); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + threads_pool.spawn_ok(until_imported.into_future().map(|_| ())); // assert that we will make sync requests - let assert = futures::future::poll_fn::<(), (), _>(|| { + let assert = futures::future::poll_fn(|_| { let block_sync_requests = block_sync_requester.requests.lock(); // we request blocks targeted by the precommits that aren't imported if block_sync_requests.contains(&(h2.hash(), *h2.number())) && block_sync_requests.contains(&(h3.hash(), *h3.number())) { - return Ok(Async::Ready(())); + return Poll::Ready(()); } - Ok(Async::NotReady) + Poll::Pending }); // the `until_imported` stream doesn't request the blocks immediately, // but it should request them after a small timeout - let timeout = Delay::new(Duration::from_secs(60)).unit_error().compat(); - let test = assert.select2(timeout).map(|res| match res { - Either::A(_) => {}, - Either::B(_) => panic!("timed out waiting for block sync request"), - }).map_err(|_| ()); + let timeout = Delay::new(Duration::from_secs(60)); + let test = future::select(assert, timeout).map(|res| match res { + Either::Left(_) => {}, + Either::Right(_) => panic!("timed out waiting for block sync request"), + }).map(drop); - runtime.block_on(test).unwrap(); + futures::executor::block_on(test); } } diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 194bd09e24b525c64e6f44ebe5407125686879c9..caf97438adce5ecf55e6859304fbbca234642aff 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -1067,7 +1067,7 @@ ServiceBuilder< has_bootnodes, ), exit.clone()).map(drop))); - let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default(); + let telemetry_connection_sinks: Arc<Mutex<Vec<futures::channel::mpsc::UnboundedSender<()>>>> = Default::default(); // Telemetry let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 1b2e7bcd3cc7166aa32ce75586f50d99dbbc6bef..57e1462f6413654dcd58d0b7bd5ce853ee43a440 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -100,7 +100,7 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> { rpc_handlers: sc_rpc_server::RpcHandler<sc_rpc::Metadata>, _rpc: Box<dyn std::any::Any + Send + Sync>, _telemetry: Option<sc_telemetry::Telemetry>, - _telemetry_on_connect_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>>, + _telemetry_on_connect_sinks: Arc<Mutex<Vec<futures::channel::mpsc::UnboundedSender<()>>>>, _offchain_workers: Option<Arc<TOc>>, keystore: sc_keystore::KeyStorePtr, marker: PhantomData<TBl>, @@ -153,7 +153,7 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> + type NetworkSpecialization: NetworkSpecialization<Self::Block>; /// Get event stream for telemetry connection established events. - fn telemetry_on_connect_stream(&self) -> mpsc::UnboundedReceiver<()>; + fn telemetry_on_connect_stream(&self) -> futures::channel::mpsc::UnboundedReceiver<()>; /// return a shared instance of Telemetry (if enabled) fn telemetry(&self) -> Option<sc_telemetry::Telemetry>; @@ -224,8 +224,8 @@ where type TransactionPool = TExPool; type NetworkSpecialization = TNetSpec; - fn telemetry_on_connect_stream(&self) -> mpsc::UnboundedReceiver<()> { - let (sink, stream) = mpsc::unbounded(); + fn telemetry_on_connect_stream(&self) -> futures::channel::mpsc::UnboundedReceiver<()> { + let (sink, stream) = futures::channel::mpsc::unbounded(); self._telemetry_on_connect_sinks.lock().push(sink); stream }