diff --git a/substrate/client/finality-grandpa/src/communication/mod.rs b/substrate/client/finality-grandpa/src/communication/mod.rs
index 18cb14c7396367b06fc54cfc4cfc5f3f98835ed8..64637c0ed8f96926607399d540f9f841ed7b9f2f 100644
--- a/substrate/client/finality-grandpa/src/communication/mod.rs
+++ b/substrate/client/finality-grandpa/src/communication/mod.rs
@@ -27,12 +27,13 @@
 //! In the future, there will be a fallback for allowing sending the same message
 //! under certain conditions that are used to un-stick the protocol.
 
-use futures::{prelude::*, sync::mpsc};
+use futures::prelude::*;
 use futures03::{
 	channel::mpsc as mpsc03,
 	compat::Compat,
-	future::{Future as Future03},
-	stream::StreamExt,
+	future::{self as future03, Future as Future03},
+	sink::Sink as Sink03,
+	stream::{Stream as Stream03, StreamExt},
 };
 use log::{debug, trace};
 use parking_lot::Mutex;
@@ -276,8 +277,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
 		local_key: Option<AuthorityPair>,
 		has_voted: HasVoted<B>,
 	) -> (
-		impl Stream<Item=SignedMessage<B>,Error=Error>,
-		impl Sink<SinkItem=Message<B>,SinkError=Error>,
+		impl Stream03<Item=SignedMessage<B>> + Unpin,
+		OutgoingMessages<B>,
 	) {
 		self.note_round(
 			round,
@@ -295,22 +296,20 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
 		});
 
 		let topic = round_topic::<B>(round.0, set_id.0);
-		let incoming = Compat::new(self.gossip_engine.messages_for(topic)
-			.map(|item| Ok::<_, ()>(item)))
-			.filter_map(|notification| {
+		let incoming = self.gossip_engine.messages_for(topic)
+			.filter_map(move |notification| {
 				let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
-				if let Err(ref e) = decoded {
-					debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
-				}
-				decoded.ok()
-			})
-			.and_then(move |msg| {
-				match msg {
-					GossipMessage::Vote(msg) => {
+
+				match decoded {
+					Err(ref e) => {
+						debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
+						return future03::ready(None);
+					}
+					Ok(GossipMessage::Vote(msg)) => {
 						// check signature.
 						if !voters.contains_key(&msg.message.id) {
 							debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
-							return Ok(None);
+							return future03::ready(None);
 						}
 
 						if voters.len() <= TELEMETRY_VOTERS_LIMIT {
@@ -339,18 +338,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
 							};
 						}
 
-						Ok(Some(msg.message))
+						future03::ready(Some(msg.message))
 					}
 					_ => {
 						debug!(target: "afg", "Skipping unknown message type");
-						return Ok(None);
+						return future03::ready(None);
 					}
 				}
-			})
-			.filter_map(|x| x)
-			.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")));
+			});
 
-		let (tx, out_rx) = mpsc::unbounded();
+		let (tx, out_rx) = mpsc03::channel(0);
 		let outgoing = OutgoingMessages::<B> {
 			round: round.0,
 			set_id: set_id.0,
@@ -360,14 +357,10 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
 			has_voted,
 		};
 
-		let out_rx = out_rx.map_err(move |()| Error::Network(
-			format!("Failed to receive on unbounded receiver for round {}", round.0)
-		));
-
 		// Combine incoming votes from external GRANDPA nodes with outgoing
 		// votes from our own GRANDPA voter to have a single
 		// vote-import-pipeline.
-		let incoming = incoming.select(out_rx);
+		let incoming = futures03::stream::select(incoming, out_rx);
 
 		(incoming, outgoing)
 	}
@@ -690,21 +683,29 @@ pub(crate) fn check_message_sig_with_buffer<Block: BlockT>(
 /// use the same raw message and key to sign. This is currently true for
 /// `ed25519` and `BLS` signatures (which we might use in the future), care must
 /// be taken when switching to different key types.
-struct OutgoingMessages<Block: BlockT> {
+pub(crate) struct OutgoingMessages<Block: BlockT> {
 	round: RoundNumber,
 	set_id: SetIdNumber,
 	locals: Option<(AuthorityPair, AuthorityId)>,
-	sender: mpsc::UnboundedSender<SignedMessage<Block>>,
+	sender: mpsc03::Sender<SignedMessage<Block>>,
 	network: GossipEngine<Block>,
 	has_voted: HasVoted<Block>,
 }
 
-impl<Block: BlockT> Sink for OutgoingMessages<Block>
+impl<B: BlockT> Unpin for OutgoingMessages<B> {}
+
+impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block>
 {
-	type SinkItem = Message<Block>;
-	type SinkError = Error;
+	type Error = Error;
+
+	fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
+		Sink03::poll_ready(Pin::new(&mut self.sender), cx)
+			.map(|elem| { elem.map_err(|e| {
+				Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
+			})})
+	}
 
-	fn start_send(&mut self, mut msg: Message<Block>) -> StartSend<Message<Block>, Error> {
+	fn start_send(mut self: Pin<&mut Self>, mut msg: Message<Block>) -> Result<(), Self::Error> {
 		// if we've voted on this round previously under the same key, send that vote instead
 		match &mut msg {
 			finality_grandpa::Message::PrimaryPropose(ref mut vote) =>
@@ -760,17 +761,23 @@ impl<Block: BlockT> Sink for OutgoingMessages<Block>
 			self.network.gossip_message(topic, message.encode(), false);
 
 			// forward the message to the inner sender.
-			let _ = self.sender.unbounded_send(signed);
-		}
+			return self.sender.start_send(signed).map_err(|e| {
+				Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
+			});
+		};
 
-		Ok(AsyncSink::Ready)
+		Ok(())
 	}
 
-	fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
+	fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
+		Poll03::Ready(Ok(()))
+	}
 
-	fn close(&mut self) -> Poll<(), Error> {
-		// ignore errors since we allow this inner sender to be closed already.
-		self.sender.close().or_else(|_| Ok(Async::Ready(())))
+	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
+		Sink03::poll_close(Pin::new(&mut self.sender), cx)
+			.map(|elem| { elem.map_err(|e| {
+				Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
+			})})
 	}
 }
 
diff --git a/substrate/client/finality-grandpa/src/environment.rs b/substrate/client/finality-grandpa/src/environment.rs
index 372229001dd196991a09bd8f697f1664dd5d2856..c5c6291dc0bc6911a5fbf0e30dbfafb8025ba151 100644
--- a/substrate/client/finality-grandpa/src/environment.rs
+++ b/substrate/client/finality-grandpa/src/environment.rs
@@ -22,7 +22,11 @@ use std::time::Duration;
 use log::{debug, warn, info};
 use parity_scale_codec::{Decode, Encode};
 use futures::prelude::*;
-use futures03::future::{FutureExt as _, TryFutureExt as _};
+use futures03::{
+	compat::{Compat, CompatSink},
+	future::{FutureExt as _, TryFutureExt as _},
+	stream::StreamExt as _,
+};
 use futures_timer::Delay;
 use parking_lot::RwLock;
 use sp_blockchain::{HeaderBackend, Error as ClientError};
@@ -608,6 +612,9 @@ where
 			has_voted,
 		);
 
+		let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item)));
+		let outgoing = CompatSink::new(outgoing);
+
 		// schedule incoming messages from the network to be held until
 		// corresponding blocks are imported.
 		let incoming = Box::new(UntilVoteTargetImported::new(
diff --git a/substrate/client/finality-grandpa/src/tests.rs b/substrate/client/finality-grandpa/src/tests.rs
index 889250c54d7b769c382a5d3188f6c5fd76f276f8..fdfb2fd808ec5d82aa97a28f104945abba69ffa6 100644
--- a/substrate/client/finality-grandpa/src/tests.rs
+++ b/substrate/client/finality-grandpa/src/tests.rs
@@ -37,15 +37,17 @@ use sp_consensus::{
 	BlockOrigin, ForkChoiceStrategy, ImportedAux, BlockImportParams, ImportResult, BlockImport,
 	import_queue::{BoxJustificationImport, BoxFinalityProofImport},
 };
-use std::collections::{HashMap, HashSet};
-use std::result;
+use std::{
+	collections::{HashMap, HashSet},
+	result,
+	pin::Pin, task,
+};
 use parity_scale_codec::Decode;
-use sp_runtime::traits::{Header as HeaderT, HasherFor};
+use sp_runtime::traits::{Block as BlockT, Header as HeaderT, HasherFor};
 use sp_runtime::generic::{BlockId, DigestItem};
 use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public};
 use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi};
 use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check};
-use std::{pin::Pin, task};
 
 use authorities::AuthoritySet;
 use finality_proof::{
@@ -1282,6 +1284,9 @@ fn voter_persists_its_votes() {
 			HasVoted::No,
 		);
 
+		let round_rx = futures03::compat::Compat::new(round_rx.map(|item| Ok::<_, Error>(item)));
+		let round_tx = futures03::compat::CompatSink::new(round_tx);
+
 		let round_tx = Arc::new(Mutex::new(round_tx));
 		let exit_tx = Arc::new(Mutex::new(Some(exit_tx)));
 
@@ -1332,7 +1337,17 @@ fn voter_persists_its_votes() {
 							target_hash: block_30_hash,
 						};
 
-						round_tx.lock().start_send(finality_grandpa::Message::Prevote(prevote)).unwrap();
+						// One should either be calling `Sink::send` or `Sink::start_send` followed
+						// by `Sink::poll_complete` to make sure items are being flushed. Given that
+						// we send in a loop including a delay until items are received, this can be
+						// ignored for the sake of reduced complexity.
+						if !round_tx.lock()
+							.start_send(finality_grandpa::Message::Prevote(prevote))
+							.unwrap()
+							.is_ready() {
+								panic!("expected sink to be ready to write to.");
+							}
+
 						Ok(())
 					}).map_err(|_| panic!()))