diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index bf5f25d87d1a6988fb5dbe33283d96606131faf4..fc928b7491cec1dc5ae3fd46e683531b067e6a67 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -84,7 +84,7 @@ extern crate env_logger; extern crate parity_codec_derive; use futures::prelude::*; -use futures::sync::{self, mpsc}; +use futures::sync::{self, mpsc, oneshot}; use client::{ BlockchainEvents, CallExecutor, Client, backend::Backend, error::Error as ClientError, @@ -103,7 +103,6 @@ use grandpa::Error as GrandpaError; use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet}; use network::Service as NetworkService; -use network::consensus_gossip::ConsensusMessage; use std::sync::Arc; use std::time::Duration; @@ -216,6 +215,32 @@ impl From<ClientError> for Error { } } +/// A stream used by NetworkBridge in its implementation of Network. +pub struct NetworkStream { + inner: Option<mpsc::UnboundedReceiver<Vec<u8>>>, + outer: oneshot::Receiver<mpsc::UnboundedReceiver<Vec<u8>>> +} + +impl Stream for NetworkStream { + type Item = Vec<u8>; + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + if let Some(ref mut inner) = self.inner { + return inner.poll(); + } + match self.outer.poll() { + Ok(futures::Async::Ready(mut inner)) => { + let poll_result = inner.poll(); + self.inner = Some(inner); + poll_result + }, + Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady), + Err(_) => Err(()) + } + } +} + /// A handle to the network. This is generally implemented by providing some /// handle to a gossip service or similar. /// @@ -277,14 +302,14 @@ fn commit_topic<B: BlockT>(set_id: u64) -> B::Hash { } impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B> for NetworkBridge<B, S> { - type In = mpsc::UnboundedReceiver<ConsensusMessage>; + type In = NetworkStream; fn messages_for(&self, round: u64, set_id: u64) -> Self::In { let (tx, rx) = sync::oneshot::channel(); self.service.with_gossip(move |gossip, _| { let inner_rx = gossip.messages_for(message_topic::<B>(round, set_id)); let _ = tx.send(inner_rx); }); - rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully") + NetworkStream { outer: rx, inner: None } } fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) { @@ -308,7 +333,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B let inner_rx = gossip.messages_for(commit_topic::<B>(set_id)); let _ = tx.send(inner_rx); }); - rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully") + NetworkStream { outer: rx, inner: None } } fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>) {