Skip to content
Snippets Groups Projects
Commit 78094219 authored by Pierre Krieger's avatar Pierre Krieger Committed by Gavin Wood
Browse files

Fix telemetry not responding to pings (#3272)

parent 5b1b4b92
No related merge requests found
......@@ -187,6 +187,25 @@ impl TelemetryWorker {
/// For some context, we put this object around the `wasm_ext::ExtTransport` in order to make sure
/// that each telemetry message maps to one single call to `write` in the WASM FFI.
struct StreamSink<T>(T);
impl<T: tokio_io::AsyncRead> futures01::Stream for StreamSink<T> {
type Item = BytesMut;
type Error = io::Error;
fn poll(&mut self) -> futures01::Poll<Option<Self::Item>, Self::Error> {
let mut buf = [0; 128];
Ok(self.0.poll_read(&mut buf)?
.map(|n|
if n == 0 {
None
} else {
let buf: BytesMut = buf[..n].into();
Some(buf)
}
))
}
}
impl<T: tokio_io::AsyncWrite> futures01::Sink for StreamSink<T> {
type SinkItem = BytesMut;
type SinkError = io::Error;
......
......@@ -87,7 +87,7 @@ impl<TTrans: Transport> Node<TTrans> {
impl<TTrans: Transport, TSinkErr> Node<TTrans>
where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Unpin, TSinkErr: fmt::Debug {
TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Stream + Unpin, TSinkErr: fmt::Debug {
/// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node.
///
/// After calling this method, you should call `poll` in order for it to be properly processed.
......@@ -175,7 +175,7 @@ fn gen_rand_reconnect_delay() -> Delay {
}
impl<TTrans: Transport, TSinkErr> NodeSocketConnected<TTrans>
where TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Unpin {
where TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Stream + Unpin {
/// Processes the queue of messages for the connected socket.
///
/// The address is passed for logging purposes only.
......@@ -200,6 +200,7 @@ where TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Unpin {
item_len, my_addr
);
self.need_flush = true;
} else if self.need_flush {
match Sink::poll_flush(Pin::new(&mut self.sink), cx) {
Poll::Pending => return Poll::Pending,
......@@ -207,6 +208,11 @@ where TTrans::Output: Sink<BytesMut, Error = TSinkErr> + Unpin {
Poll::Ready(Ok(())) => self.need_flush = false,
}
} else if let Poll::Ready(_) = Stream::poll_next(Pin::new(&mut self.sink), cx) {
// We poll the telemetry `Stream` because the underlying implementation relies on
// this in order to answer PINGs.
// We don't do anything with incoming messages, however.
} else {
break
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment