diff --git a/substrate/core/telemetry/src/worker/node.rs b/substrate/core/telemetry/src/worker/node.rs index d6193d4cc6ab9175f36c52b57d30d50b920793a1..11b1f2a81e6996da38c9636f563bbf809922b7a2 100644 --- a/substrate/core/telemetry/src/worker/node.rs +++ b/substrate/core/telemetry/src/worker/node.rs @@ -87,7 +87,11 @@ impl<TTrans: Transport> Node<TTrans> { impl<TTrans: Transport, TSinkErr> Node<TTrans> where TTrans: Clone + Unpin, TTrans::Dial: Unpin, - TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Stream + Unpin, TSinkErr: fmt::Debug { + TTrans::Output: Sink<BytesMut, Error = TSinkErr> + + Stream<Item=Result<BytesMut, TSinkErr>> + + Unpin, + TSinkErr: fmt::Debug +{ /// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node. /// /// After calling this method, you should call `poll` in order for it to be properly processed. @@ -175,7 +179,10 @@ fn gen_rand_reconnect_delay() -> Delay { } impl<TTrans: Transport, TSinkErr> NodeSocketConnected<TTrans> -where TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Stream + Unpin { +where TTrans::Output: Sink<BytesMut, Error = TSinkErr> + + Stream<Item=Result<BytesMut, TSinkErr>> + + Unpin +{ /// Processes the queue of messages for the connected socket. /// /// The address is passed for logging purposes only. @@ -208,13 +215,18 @@ where TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Stream + Unpin { Poll::Ready(Ok(())) => self.need_flush = false, } - } else if let Poll::Ready(_) = Stream::poll_next(Pin::new(&mut self.sink), cx) { - // We poll the telemetry `Stream` because the underlying implementation relies on - // this in order to answer PINGs. - // We don't do anything with incoming messages, however. - } else { - break + match Stream::poll_next(Pin::new(&mut self.sink), cx) { + Poll::Ready(Some(Ok(_))) => { + // We poll the telemetry `Stream` because the underlying implementation relies on + // this in order to answer PINGs. + // We don't do anything with incoming messages, however. + }, + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(Err(err)) + }, + Poll::Pending | Poll::Ready(None) => break, + } } }