Skip to content
Snippets Groups Projects
Commit 7f3001a4 authored by Andrei Sandu's avatar Andrei Sandu Committed by GitHub
Browse files

Add counter for bounded channel sends that block. (#5490)


* Add counter for bounded channel sends that block.

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* fix typos

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Fix bounded sent metric

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* refactor a bit and test

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* Return disconnect errors early.

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>

* future proof error handling

Signed-off-by: default avatarAndrei Sandu <andrei-mihail@parity.io>
parent 2956a97a
No related merge requests found
......@@ -158,13 +158,23 @@ impl<T> MeteredSender<T> {
where
Self: Unpin,
{
let msg = self.prepare_with_tof(msg);
let fut = self.inner.send(msg);
futures::pin_mut!(fut);
fut.await.map_err(|e| {
self.meter.retract_sent();
e
})
match self.try_send(msg) {
Err(send_err) => {
if !send_err.is_full() {
return Err(send_err.into_send_error())
}
let msg = send_err.into_inner();
self.meter.note_sent();
let fut = self.inner.send(msg);
futures::pin_mut!(fut);
fut.await.map_err(|e| {
self.meter.retract_sent();
e
})
},
_ => Ok(()),
}
}
/// Attempt to send message or fail immediately.
......@@ -174,6 +184,10 @@ impl<T> MeteredSender<T> {
) -> result::Result<(), mpsc::TrySendError<MaybeTimeOfFlight<T>>> {
let msg = self.prepare_with_tof(msg);
self.inner.try_send(msg).map_err(|e| {
if e.is_full() {
// Count bounded channel sends that block.
self.meter.note_blocked();
}
self.meter.retract_sent();
e
})
......
......@@ -42,6 +42,8 @@ pub struct Meter {
sent: Arc<AtomicUsize>,
// Number of receives on this channel.
received: Arc<AtomicUsize>,
// Number of times senders blocked while sending messages to a subsystem.
blocked: Arc<AtomicUsize>,
// Atomic ringbuffer of the last 50 time of flight values
tof: Arc<crossbeam_queue::ArrayQueue<CoarseDuration>>,
}
......@@ -51,6 +53,7 @@ impl std::default::Default for Meter {
Self {
sent: Arc::new(AtomicUsize::new(0)),
received: Arc::new(AtomicUsize::new(0)),
blocked: Arc::new(AtomicUsize::new(0)),
tof: Arc::new(crossbeam_queue::ArrayQueue::new(100)),
}
}
......@@ -65,6 +68,8 @@ pub struct Readout {
pub sent: usize,
/// The amount of messages received on the channel, in aggregate.
pub received: usize,
/// How many times the caller blocked when sending messages.
pub blocked: usize,
/// Time of flight in micro seconds (us)
pub tof: Vec<CoarseDuration>,
}
......@@ -77,6 +82,7 @@ impl Meter {
Readout {
sent: self.sent.load(Ordering::Relaxed),
received: self.received.load(Ordering::Relaxed),
blocked: self.blocked.load(Ordering::Relaxed),
tof: {
let mut acc = Vec::with_capacity(self.tof.len());
while let Some(value) = self.tof.pop() {
......@@ -99,6 +105,10 @@ impl Meter {
self.received.fetch_add(1, Ordering::Relaxed);
}
fn note_blocked(&self) {
self.blocked.fetch_add(1, Ordering::Relaxed);
}
fn note_time_of_flight(&self, tof: CoarseDuration) {
let _ = self.tof.force_push(tof);
}
......
......@@ -39,12 +39,12 @@ fn try_send_try_next() {
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 1, .. });
rx.try_next().unwrap();
rx.try_next().unwrap();
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, tof } => {
assert_matches!(tx.meter().read(), Readout { sent: 4, received: 3, blocked: 0, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 1);
});
rx.try_next().unwrap();
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, tof } => {
assert_matches!(rx.meter().read(), Readout { sent: 4, received: 4, blocked: 0, tof } => {
// every second in test, consumed before
assert_eq!(dbg!(tof).len(), 0);
});
......@@ -127,3 +127,24 @@ fn failed_send_does_not_inc_sent() {
assert_matches!(unbounded.meter().read(), Readout { sent: 0, received: 0, .. });
});
}
#[test]
fn blocked_send_is_metered() {
let (mut bounded_sender, mut bounded_receiver) = channel::<Msg>(1);
block_on(async move {
assert!(bounded_sender.send(Msg::default()).await.is_ok());
assert!(bounded_sender.send(Msg::default()).await.is_ok());
assert!(bounded_sender.try_send(Msg::default()).is_err());
assert_matches!(
bounded_sender.meter().read(),
Readout { sent: 2, received: 0, blocked: 1, .. }
);
bounded_receiver.try_next().unwrap();
assert_matches!(
bounded_receiver.meter().read(),
Readout { sent: 2, received: 1, blocked: 1, .. }
);
});
}
......@@ -31,6 +31,7 @@ struct MetricsInner {
to_subsystem_bounded_tof: prometheus::HistogramVec,
to_subsystem_bounded_sent: prometheus::GaugeVec<prometheus::U64>,
to_subsystem_bounded_received: prometheus::GaugeVec<prometheus::U64>,
to_subsystem_bounded_blocked: prometheus::GaugeVec<prometheus::U64>,
to_subsystem_unbounded_tof: prometheus::HistogramVec,
to_subsystem_unbounded_sent: prometheus::GaugeVec<prometheus::U64>,
......@@ -91,6 +92,11 @@ impl Metrics {
.with_label_values(&[name])
.set(readouts.bounded.received as u64);
metrics
.to_subsystem_bounded_blocked
.with_label_values(&[name])
.set(readouts.bounded.blocked as u64);
metrics
.to_subsystem_unbounded_sent
.with_label_values(&[name])
......@@ -180,6 +186,16 @@ impl MetricsTrait for Metrics {
)?,
registry,
)?,
to_subsystem_bounded_blocked: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
"polkadot_parachain_subsystem_bounded_blocked",
"Number of times senders blocked while sending messages to a subsystem",
),
&["subsystem_name"],
)?,
registry,
)?,
to_subsystem_unbounded_tof: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment