Skip to content
Snippets Groups Projects
Commit c7b9430b authored by David's avatar David Committed by GitHub
Browse files

Misc telemetry polish (#8484)

* Remove TelemetryWorker::with_transport
Make logging more useful

* Re-instate TelemetryWorker::with_transport

* Fix typo, don't spam
parent 40c0294f
Branches
No related merge requests found
......@@ -122,21 +122,11 @@ impl TelemetryWorker {
///
/// Only one is needed per process.
pub fn new(buffer_size: usize) -> Result<Self> {
let transport = initialize_transport(None)?;
let (message_sender, message_receiver) = mpsc::channel(buffer_size);
let (register_sender, register_receiver) = mpsc::unbounded();
Ok(Self {
message_receiver,
message_sender,
register_receiver,
register_sender,
id_counter: Arc::new(atomic::AtomicU64::new(1)),
transport,
})
Self::with_transport(buffer_size, None)
}
/// Instantiate a new [`TelemetryWorker`] which can run in background.
/// Instantiate a new [`TelemetryWorker`] with the given [`ExtTransport`]
/// which can run in background.
///
/// Only one is needed per process.
pub fn with_transport(buffer_size: usize, transport: Option<ExtTransport>) -> Result<Self> {
......@@ -312,12 +302,6 @@ impl TelemetryWorker {
for (node_max_verbosity, addr) in nodes {
if verbosity > *node_max_verbosity {
log::trace!(
target: "telemetry",
"Skipping {} for log entry with verbosity {:?}",
addr,
verbosity,
);
continue;
}
......
......@@ -73,8 +73,9 @@ enum NodeSocket<TTrans: Transport> {
impl<TTrans: Transport> NodeSocket<TTrans> {
fn wait_reconnect() -> NodeSocket<TTrans> {
let random_delay = rand::thread_rng().gen_range(5, 10);
let random_delay = rand::thread_rng().gen_range(10, 20);
let delay = Delay::new(Duration::from_secs(random_delay));
log::trace!(target: "telemetry", "Pausing for {} secs before reconnecting", random_delay);
NodeSocket::WaitingReconnect(delay)
}
}
......@@ -214,11 +215,11 @@ where
},
NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) {
Ok(d) => {
log::debug!(target: "telemetry", "Started dialing {}", self.addr);
log::trace!(target: "telemetry", "Re-dialing {}", self.addr);
socket = NodeSocket::Dialing(d);
}
Err(err) => {
log::warn!(target: "telemetry", "❌ Error while dialing {}: {:?}", self.addr, err);
log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err);
socket = NodeSocket::wait_reconnect();
}
},
......@@ -236,16 +237,18 @@ where
}
};
// The Dispatcher blocks when the Node sinks blocks. This is why it is important that the
// Node sinks doesn't go into "Pending" state while waiting for reconnection but rather
// The Dispatcher blocks when the Node syncs blocks. This is why it is important that the
// Node sinks don't go into "Pending" state while waiting for reconnection but rather
// discard the excess of telemetry messages.
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: TelemetryPayload) -> Result<(), Self::Error> {
// Any buffered outgoing telemetry messages are discarded while (re-)connecting.
match &mut self.socket {
NodeSocket::Connected(conn) => match serde_json::to_vec(&item) {
Ok(data) => {
log::trace!(target: "telemetry", "Sending {} bytes", data.len());
let _ = conn.sink.start_send_unpin(data);
}
Err(err) => log::debug!(
......@@ -254,18 +257,14 @@ where
err,
),
},
_socket => {
log::trace!(
target: "telemetry",
"Message has been discarded: {}",
serde_json::to_string(&item)
.unwrap_or_else(|err| format!(
"could not be serialized ({}): {:?}",
err,
item,
)),
);
}
// We are currently dialing the node.
NodeSocket::Dialing(_) => log::trace!(target: "telemetry", "Dialing"),
// A new connection should be started as soon as possible.
NodeSocket::ReconnectNow => log::trace!(target: "telemetry", "Reconnecting"),
// Waiting before attempting to dial again.
NodeSocket::WaitingReconnect(_) => {}
// Temporary transition state.
NodeSocket::Poisoned => log::trace!(target: "telemetry", "Poisoned"),
}
Ok(())
}
......@@ -273,7 +272,12 @@ where
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match &mut self.socket {
NodeSocket::Connected(conn) => match conn.sink.poll_flush_unpin(cx) {
Poll::Ready(Err(_)) => {
Poll::Ready(Err(e)) => {
// When `telemetry` closes the websocket connection we end
// up here, which is sub-optimal. See
// https://github.com/libp2p/rust-libp2p/issues/2021 for
// what we could do to improve this.
log::trace!(target: "telemetry", "[poll_flush] Error: {:?}", e);
self.socket = NodeSocket::wait_reconnect();
Poll::Ready(Ok(()))
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment