Skip to content
Snippets Groups Projects
Commit 7c0592a9 authored by Pierre Krieger's avatar Pierre Krieger Committed by Gavin Wood
Browse files

Telemetry timeout (#3916)

* telemetry worker: add connection timeout

* restructure

* only add timeout when writing data

* don't overwrite an existing delay

* set timeout only around writing data

* address comments

* dedicated error enum

* remove whitespace

* move timeout to inside struct

* fix timeout

* remove prints

* move polling

* address comment

* Implement

* More work
parent a0094e72
No related merge requests found
......@@ -58,6 +58,8 @@ struct NodeSocketConnected<TTrans: Transport> {
pending: VecDeque<BytesMut>,
/// If true, we need to flush the sink.
need_flush: bool,
/// A timeout for the socket to write data.
timeout: Option<Delay>,
}
/// Event that can happen with this node.
......@@ -66,7 +68,16 @@ pub enum NodeEvent<TSinkErr> {
/// We are now connected to this node.
Connected,
/// We are now disconnected from this node.
Disconnected(TSinkErr),
Disconnected(ConnectionError<TSinkErr>),
}
/// Reason for disconnecting from a node.
#[derive(Debug)]
pub enum ConnectionError<TSinkErr> {
/// The connection timed-out.
Timeout,
/// The sink errored.
Sink(TSinkErr),
}
impl<TTrans: Transport> Node<TTrans> {
......@@ -116,10 +127,12 @@ where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
let mut socket = mem::replace(&mut self.socket, NodeSocket::Poisoned);
self.socket = loop {
match socket {
NodeSocket::Connected(mut conn) =>
NodeSocket::Connected(mut conn) => {
match NodeSocketConnected::poll(Pin::new(&mut conn), cx, &self.addr) {
Poll::Ready(Ok(v)) => match v {}
Poll::Pending => break NodeSocket::Connected(conn),
Poll::Ready(Ok(v)) => match v {},
Poll::Pending => {
break NodeSocket::Connected(conn)
},
Poll::Ready(Err(err)) => {
warn!(target: "telemetry", "Disconnected from {}: {:?}", self.addr, err);
let timeout = gen_rand_reconnect_delay();
......@@ -127,10 +140,16 @@ where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
return Poll::Ready(NodeEvent::Disconnected(err))
}
}
}
NodeSocket::Dialing(mut s) => match Future::poll(Pin::new(&mut s), cx) {
Poll::Ready(Ok(sink)) => {
debug!(target: "telemetry", "Connected to {}", self.addr);
let conn = NodeSocketConnected { sink, pending: VecDeque::new(), need_flush: false };
let conn = NodeSocketConnected {
sink,
pending: VecDeque::new(),
need_flush: false,
timeout: None,
};
self.socket = NodeSocket::Connected(conn);
return Poll::Ready(NodeEvent::Connected)
},
......@@ -189,18 +208,15 @@ where TTrans::Output: Sink<BytesMut, Error = TSinkErr>
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context,
my_addr: &Multiaddr
) -> Poll<Result<futures::never::Never, TSinkErr>> {
loop {
if let Some(item) = self.pending.pop_front() {
if let Poll::Pending = Sink::poll_ready(Pin::new(&mut self.sink), cx) {
self.pending.push_front(item);
return Poll::Pending
}
my_addr: &Multiaddr,
) -> Poll<Result<futures::never::Never, ConnectionError<TSinkErr>>> {
while let Some(item) = self.pending.pop_front() {
if let Poll::Ready(_) = Sink::poll_ready(Pin::new(&mut self.sink), cx) {
let item_len = item.len();
if let Err(err) = Sink::start_send(Pin::new(&mut self.sink), item) {
return Poll::Ready(Err(err))
self.timeout = None;
return Poll::Ready(Err(ConnectionError::Sink(err)))
}
trace!(
target: "telemetry", "Successfully sent {:?} bytes message to {}",
......@@ -208,28 +224,59 @@ where TTrans::Output: Sink<BytesMut, Error = TSinkErr>
);
self.need_flush = true;
} else if self.need_flush {
match Sink::poll_flush(Pin::new(&mut self.sink), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Ready(Ok(())) => self.need_flush = false,
} else {
self.pending.push_front(item);
if self.timeout.is_none() {
self.timeout = Some(Delay::new(Duration::from_secs(10)));
}
break;
}
}
} else {
match Stream::poll_next(Pin::new(&mut self.sink), cx) {
Poll::Ready(Some(Ok(_))) => {
// We poll the telemetry `Stream` because the underlying implementation relies on
// this in order to answer PINGs.
// We don't do anything with incoming messages, however.
},
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(Err(err))
},
Poll::Pending | Poll::Ready(None) => break,
if self.need_flush {
match Sink::poll_flush(Pin::new(&mut self.sink), cx) {
Poll::Pending => {
if self.timeout.is_none() {
self.timeout = Some(Delay::new(Duration::from_secs(10)));
}
},
Poll::Ready(Err(err)) => {
self.timeout = None;
return Poll::Ready(Err(ConnectionError::Sink(err)))
},
Poll::Ready(Ok(())) => {
self.timeout = None;
self.need_flush = false;
},
}
}
if let Some(timeout) = self.timeout.as_mut() {
match Future::poll(Pin::new(timeout), cx) {
Poll::Pending => {},
Poll::Ready(Err(err)) => {
self.timeout = None;
warn!(target: "telemetry", "Connection timeout error for {} {:?}", my_addr, err);
}
Poll::Ready(Ok(_)) => {
self.timeout = None;
return Poll::Ready(Err(ConnectionError::Timeout))
}
}
}
match Stream::poll_next(Pin::new(&mut self.sink), cx) {
Poll::Ready(Some(Ok(_))) => {
// We poll the telemetry `Stream` because the underlying implementation relies on
// this in order to answer PINGs.
// We don't do anything with incoming messages, however.
},
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(Err(ConnectionError::Sink(err)))
},
Poll::Pending | Poll::Ready(None) => {},
}
Poll::Pending
}
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment