diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 7492465574e3ddd8343067a5a03a097be0ba64fd..c80ad0d5e3a6dfda9a796445b828b5735034e5a9 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -3879,6 +3879,7 @@ name = "metered-channel" version = "0.1.0" dependencies = [ "assert_matches", + "derive_more", "futures 0.3.13", "futures-timer 3.0.2", ] diff --git a/polkadot/node/metered-channel/Cargo.toml b/polkadot/node/metered-channel/Cargo.toml index 5f654479b096fc3b20e30f4a157cf06b1414b166..9a174ae1f0c57470723d11bc5c601af59052e6bc 100644 --- a/polkadot/node/metered-channel/Cargo.toml +++ b/polkadot/node/metered-channel/Cargo.toml @@ -8,6 +8,7 @@ description = "Channels with attached Meters" [dependencies] futures = "0.3.12" futures-timer = "3.0.2" +derive_more = "0.99" [dev-dependencies] assert_matches = "1.4.0" diff --git a/polkadot/node/metered-channel/src/bounded.rs b/polkadot/node/metered-channel/src/bounded.rs index 82740266a8709f701c4d39f503519d24fb608c58..48bcfd14001d47fb8441e9545237bd90b78bbce1 100644 --- a/polkadot/node/metered-channel/src/bounded.rs +++ b/polkadot/node/metered-channel/src/bounded.rs @@ -54,8 +54,7 @@ impl<T> Stream for MeteredReceiver<T> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { match mpsc::Receiver::poll_next(Pin::new(&mut self.inner), cx) { Poll::Ready(x) => { - // always use Ordering::SeqCst to avoid underflows - self.meter.fill.fetch_sub(1, Ordering::SeqCst); + self.meter.note_received(); Poll::Ready(x) } other => other, @@ -78,7 +77,7 @@ impl<T> MeteredReceiver<T> { pub fn try_next(&mut self) -> Result<Option<T>, mpsc::TryRecvError> { match self.inner.try_next()? { Some(x) => { - self.meter.fill.fetch_sub(1, Ordering::SeqCst); + self.meter.note_received(); Ok(Some(x)) } None => Ok(None), @@ -131,17 +130,22 @@ impl<T> MeteredSender<T> { where Self: Unpin, { - self.meter.fill.fetch_add(1, Ordering::SeqCst); + self.meter.note_sent(); let fut = self.inner.send(item); futures::pin_mut!(fut); - fut.await + fut.await.map_err(|e| { + self.meter.retract_sent(); + e + }) } /// Attempt to send message or fail immediately. pub fn try_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> { - self.inner.try_send(msg)?; - self.meter.fill.fetch_add(1, Ordering::SeqCst); - Ok(()) + self.meter.note_sent(); + self.inner.try_send(msg).map_err(|e| { + self.meter.retract_sent(); + e + }) } } @@ -159,7 +163,6 @@ impl<T> futures::sink::Sink<T> for MeteredSender<T> { fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match Pin::new(&mut self.inner).poll_close(cx) { val @ Poll::Ready(_)=> { - self.meter.fill.store(0, Ordering::SeqCst); val } other => other, @@ -169,7 +172,7 @@ impl<T> futures::sink::Sink<T> for MeteredSender<T> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match Pin::new(&mut self.inner).poll_flush(cx) { val @ Poll::Ready(_)=> { - self.meter.fill.fetch_add(1, Ordering::SeqCst); + self.meter.note_sent(); val } other => other, diff --git a/polkadot/node/metered-channel/src/lib.rs b/polkadot/node/metered-channel/src/lib.rs index b7188689b035880623a9b59a5aa9d0456b5b3ff5..393ee33521a8de4f4a061274e0be54001495f20e 100644 --- a/polkadot/node/metered-channel/src/lib.rs +++ b/polkadot/node/metered-channel/src/lib.rs @@ -24,6 +24,8 @@ use std::result; use std::sync::Arc; use std::pin::Pin; +use derive_more::{Add, Display}; + mod bounded; mod unbounded; @@ -35,22 +37,50 @@ pub use self::unbounded::*; pub struct Meter { /// Name of the receiver and sender pair. name: &'static str, - // fill state of the channel - fill: Arc<AtomicUsize>, + // Number of sends on this channel. + sent: Arc<AtomicUsize>, + // Number of receives on this channel. + received: Arc<AtomicUsize>, +} + +/// A readout of sizes from the meter. Note that it is possible, due to asynchrony, for received +/// to be slightly higher than sent. +#[derive(Debug, Add, Display, Clone, Default, PartialEq)] +#[display(fmt = "(sent={} received={})", sent, received)] +pub struct Readout { + /// The amount of messages sent on the channel, in aggregate. + pub sent: usize, + /// The amount of messages received on the channel, in aggregate. + pub received: usize, } impl Meter { /// Count the number of items queued up inside the channel. - pub fn queue_count(&self) -> usize { + pub fn read(&self) -> Readout { // when obtaining we don't care much about off by one // accuracy - self.fill.load(Ordering::Relaxed) + Readout { + sent: self.sent.load(Ordering::Relaxed), + received: self.received.load(Ordering::Relaxed), + } } /// Obtain the name of the channel `Sender` and `Receiver` pair. pub fn name(&self) -> &'static str { self.name } + + fn note_sent(&self) { + self.sent.fetch_add(1, Ordering::Relaxed); + } + + fn retract_sent(&self) { + self.sent.fetch_sub(1, Ordering::Relaxed); + } + + fn note_received(&self) { + self.received.fetch_add(1, Ordering::Relaxed); + } } #[cfg(test)] @@ -69,20 +99,20 @@ mod tests { block_on(async move { let (mut tx, mut rx) = channel::<Msg>(5, "goofy"); let msg = Msg::default(); - assert_eq!(rx.meter().queue_count(), 0); + assert_eq!(rx.meter().read(), Readout { sent: 0, received: 0 }); tx.try_send(msg).unwrap(); - assert_eq!(tx.meter().queue_count(), 1); + assert_eq!(tx.meter().read(), Readout { sent: 1, received: 0 }); tx.try_send(msg).unwrap(); tx.try_send(msg).unwrap(); tx.try_send(msg).unwrap(); - assert_eq!(tx.meter().queue_count(), 4); + assert_eq!(tx.meter().read(), Readout { sent: 4, received: 0 }); rx.try_next().unwrap(); - assert_eq!(rx.meter().queue_count(), 3); + assert_eq!(rx.meter().read(), Readout { sent: 4, received: 1 }); rx.try_next().unwrap(); rx.try_next().unwrap(); - assert_eq!(tx.meter().queue_count(), 1); + assert_eq!(tx.meter().read(), Readout { sent: 4, received: 3 }); rx.try_next().unwrap(); - assert_eq!(rx.meter().queue_count(), 0); + assert_eq!(rx.meter().read(), Readout { sent: 4, received: 4 }); assert!(rx.try_next().is_err()); }); } @@ -96,9 +126,9 @@ mod tests { futures::join!( async move { let msg = Msg::default(); - assert_eq!(tx.meter().queue_count(), 0); + assert_eq!(tx.meter().read(), Readout { sent: 0, received: 0 }); tx.try_send(msg).unwrap(); - assert_eq!(tx.meter().queue_count(), 1); + assert_eq!(tx.meter().read(), Readout { sent: 1, received: 0 }); tx.try_send(msg).unwrap(); tx.try_send(msg).unwrap(); tx.try_send(msg).unwrap(); @@ -106,14 +136,14 @@ mod tests { }, async move { go.await.expect("Helper oneshot channel must work. qed"); - assert_eq!(rx.meter().queue_count(), 4); + assert_eq!(rx.meter().read(), Readout { sent: 4, received: 0 }); rx.try_next().unwrap(); - assert_eq!(rx.meter().queue_count(), 3); + assert_eq!(rx.meter().read(), Readout { sent: 4, received: 1 }); rx.try_next().unwrap(); rx.try_next().unwrap(); - assert_eq!(rx.meter().queue_count(), 1); + assert_eq!(rx.meter().read(), Readout { sent: 4, received: 3 }); rx.try_next().unwrap(); - assert_eq!(dbg!(rx.meter().queue_count()), 0); + assert_eq!(dbg!(rx.meter().read()), Readout { sent: 4, received: 4 }); } ) }); @@ -130,21 +160,37 @@ mod tests { futures::join!( async move { for i in 0..15 { - println!("Sent #{} with a backlog of {} items", i + 1, tx.meter().queue_count()); + println!("Sent #{} with a backlog of {} items", i + 1, tx.meter().read()); let msg = Msg { val: i as u8 + 1u8 }; tx.send(msg).await.unwrap(); - assert!(tx.meter().queue_count() > 0usize); + assert!(tx.meter().read().sent > 0usize); Delay::new(Duration::from_millis(20)).await; } () }, async move { while let Some(msg) = rx.next().await { - println!("rx'd one {} with {} backlogged", msg.val, rx.meter().queue_count()); + println!("rx'd one {} with {} backlogged", msg.val, rx.meter().read()); Delay::new(Duration::from_millis(29)).await; } } ) }); } + + #[test] + fn failed_send_does_not_inc_sent() { + let (mut bounded, _) = channel::<Msg>(5, "pluto"); + let (mut unbounded, _) = unbounded::<Msg>("pluto"); + + block_on(async move { + assert!(bounded.send(Msg::default()).await.is_err()); + assert!(bounded.try_send(Msg::default()).is_err()); + assert_eq!(bounded.meter().read(), Readout { sent: 0, received: 0 }); + + assert!(unbounded.send(Msg::default()).await.is_err()); + assert!(unbounded.unbounded_send(Msg::default()).is_err()); + assert_eq!(unbounded.meter().read(), Readout { sent: 0, received: 0 }); + }); + } } diff --git a/polkadot/node/metered-channel/src/unbounded.rs b/polkadot/node/metered-channel/src/unbounded.rs index c2035ece8d03e3b137824c2f4617a49d0c3a5bb7..a3b7062b875ce3172bd0fa226fbe1d48f30becfc 100644 --- a/polkadot/node/metered-channel/src/unbounded.rs +++ b/polkadot/node/metered-channel/src/unbounded.rs @@ -54,8 +54,7 @@ impl<T> Stream for UnboundedMeteredReceiver<T> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { match mpsc::UnboundedReceiver::poll_next(Pin::new(&mut self.inner), cx) { Poll::Ready(x) => { - // always use Ordering::SeqCst to avoid underflows - self.meter.fill.fetch_sub(1, Ordering::SeqCst); + self.meter.note_received(); Poll::Ready(x) } other => other, @@ -78,7 +77,7 @@ impl<T> UnboundedMeteredReceiver<T> { pub fn try_next(&mut self) -> Result<Option<T>, mpsc::TryRecvError> { match self.inner.try_next()? { Some(x) => { - self.meter.fill.fetch_sub(1, Ordering::SeqCst); + self.meter.note_received(); Ok(Some(x)) } None => Ok(None), @@ -131,18 +130,23 @@ impl<T> UnboundedMeteredSender<T> { where Self: Unpin, { - self.meter.fill.fetch_add(1, Ordering::SeqCst); + self.meter.note_sent(); let fut = self.inner.send(item); futures::pin_mut!(fut); - fut.await + fut.await.map_err(|e| { + self.meter.retract_sent(); + e + }) } /// Attempt to send message or fail immediately. pub fn unbounded_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> { - self.inner.unbounded_send(msg)?; - self.meter.fill.fetch_add(1, Ordering::SeqCst); - Ok(()) + self.meter.note_sent(); + self.inner.unbounded_send(msg).map_err(|e| { + self.meter.retract_sent(); + e + }) } } @@ -160,7 +164,6 @@ impl<T> futures::sink::Sink<T> for UnboundedMeteredSender<T> { fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match Pin::new(&mut self.inner).poll_ready(cx) { val @ Poll::Ready(_)=> { - self.meter.fill.store(0, Ordering::SeqCst); val } other => other, @@ -170,7 +173,7 @@ impl<T> futures::sink::Sink<T> for UnboundedMeteredSender<T> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match Pin::new(&mut self.inner).poll_ready(cx) { val @ Poll::Ready(_)=> { - self.meter.fill.fetch_add(1, Ordering::SeqCst); + self.meter.note_sent(); val } other => other, diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index ebe2f6b7f0ada216e0470d59d3ca74dcea1837e6..f2769aabe7bee948684a2e95272efe65f6f6190f 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -1327,8 +1327,10 @@ struct MetricsInner { deactivated_heads_total: prometheus::Counter<prometheus::U64>, messages_relayed_total: prometheus::Counter<prometheus::U64>, message_relay_timings: prometheus::Histogram, - to_overseer_channel_queue_size: prometheus::Gauge<prometheus::U64>, - from_overseer_channel_queue_size: prometheus::GaugeVec<prometheus::U64>, + to_overseer_sent: prometheus::Gauge<prometheus::U64>, + to_overseer_received: prometheus::Gauge<prometheus::U64>, + from_overseer_sent: prometheus::GaugeVec<prometheus::U64>, + from_overseer_received: prometheus::GaugeVec<prometheus::U64>, } #[derive(Default, Clone)] @@ -1360,15 +1362,21 @@ impl Metrics { fn channel_fill_level_snapshot( &self, - from_overseer: AllSubsystemsSame<(&'static str, usize)>, - to_overseer: usize, + from_overseer: AllSubsystemsSame<(&'static str, metered::Readout)>, + to_overseer: metered::Readout, ) { self.0.as_ref().map(|metrics| { - from_overseer.map_subsystems(|(name, queue_size): (_, usize)| { - metrics.from_overseer_channel_queue_size.with_label_values(&[name]).set(queue_size as u64); - }) + from_overseer.map_subsystems(|(name, readout): (_, metered::Readout)| { + metrics.from_overseer_sent.with_label_values(&[name]) + .set(readout.sent as u64); + + metrics.from_overseer_received.with_label_values(&[name]) + .set(readout.received as u64); + }); + + metrics.to_overseer_sent.set(to_overseer.sent as u64); + metrics.to_overseer_received.set(to_overseer.received as u64); }); - self.0.as_ref().map(|metrics| metrics.to_overseer_channel_queue_size.set(to_overseer as u64)); } } @@ -1418,11 +1426,23 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - from_overseer_channel_queue_size: prometheus::register( + from_overseer_sent: prometheus::register( + prometheus::GaugeVec::<prometheus::U64>::new( + prometheus::Opts::new( + "parachain_from_overseer_sent", + "Number of elements sent by the overseer to subsystems", + ), + &[ + "subsystem_name", + ], + )?, + registry, + )?, + from_overseer_received: prometheus::register( prometheus::GaugeVec::<prometheus::U64>::new( prometheus::Opts::new( - "parachain_from_overseer_channel_queue_size", - "Number of elements sitting in the channel from the overseer waiting to be processed.", + "parachain_from_overseer_received", + "Number of elements received by subsystems from overseer", ), &[ "subsystem_name", @@ -1430,11 +1450,20 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - to_overseer_channel_queue_size: prometheus::register( + to_overseer_sent: prometheus::register( + prometheus::Gauge::<prometheus::U64>::with_opts( + prometheus::Opts::new( + "parachain_to_overseer_sent", + "Number of elements sent by subsystems to overseer", + ), + )?, + registry, + )?, + to_overseer_received: prometheus::register( prometheus::Gauge::<prometheus::U64>::with_opts( prometheus::Opts::new( - "parachain_to_overseer_channel_queue_size", - "Number of elements sitting in the channel to the overseer waiting to be processed.", + "parachain_to_overseer_received", + "Number of element received by overseer from subsystems", ), )?, registry, @@ -1820,14 +1849,14 @@ where let metronome = Metronome::new(std::time::Duration::from_millis(950)) .for_each(move |_| { let to_subsystem_counts = subsystem_meters.as_ref() - .map_subsystems(|&(name, ref meter): &(_, metered::Meter)| (name, meter.queue_count())); + .map_subsystems(|&(name, ref meter): &(_, metered::Meter)| (name, meter.read())); // We combine the amount of messages from subsystems to the overseer // as well as the amount of messages from external sources to the overseer // into one to_overseer value. metronome_metrics.channel_fill_level_snapshot( to_subsystem_counts, - meter_subsystem_to_overseer.queue_count() + meter_external_to_overseer.queue_count(), + meter_subsystem_to_overseer.read() + meter_external_to_overseer.read(), ); async move { @@ -2467,7 +2496,8 @@ mod tests { assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total"); assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total"); assert_eq!(gather[3].get_name(), "parachain_overseer_messages_relay_timings"); - assert_eq!(gather[4].get_name(), "parachain_to_overseer_channel_queue_size"); + assert_eq!(gather[4].get_name(), "parachain_to_overseer_received"); + assert_eq!(gather[5].get_name(), "parachain_to_overseer_sent"); let activated = gather[0].get_metric()[0].get_counter().get_value() as u64; let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64; let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64;