From e5df3306a5e741bc97b21daa60f25870be897b8c Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 13 Feb 2025 09:25:23 +0100 Subject: [PATCH] `fatxpool`: transaction statuses metrics added (#7505) #### Overview This PR introduces a new mechanism to capture and report metrics related to timings of transaction lifecycle events, which are currently not available. By exposing these timings, we aim to augment transaction-pool reliability dashboards and extend existing Grafana boards. A new `unknown_from_block_import_txs` metric is also introduced. It provides the number of transactions in imported block which are not known to the node's transaction pool. It allows to monitor alignment of transaction pools across the nodes in the network. #### Notes for reviewers - **[Per-event Metrics](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs#L84-L105) Collection**: implemented by[ `EventsMetricsCollector`](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs#L353-L358) which allows to capture both submission timestamps and transaction status updates. An asynchronous [`EventsMetricsCollectorTask`](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs#L503-L526) processes the metrics-related messages sent by the `EventsMetricsCollector` and reports the timings of transaction statuses updates to Prometheus. This task implements event[ de-duplication](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs#L458) using a `HashMap` of [`TransactionEventMetricsData`](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs#L424-L435) entries which also holds transaction submission timestamps used to [compute timings](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs#L489-L495). Transaction-related items are removed when transaction's final status is [reported](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs#L496). - Transaction submission timestamp is reusing the timestamp of `TimedTransactionSource` kept in mempool. It is reported to `EventsMetricsCollector` in [`submit_at`](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs#L735) and [`submit_and_watch`](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs#L836) methods of `ForkAwareTxPool`. - Transaction updates are reported to `EventsMetricsCollector` from `MultiViewListener` [task](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs#L494). This allows to gather metrics for _watched_ and _non-watched_ transactions (what enables metrics on non-rpc-enabled collators). - New metric ([`unknown_from_block_import_txs`](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs#L59-L60)) allowing checking alignment of pools across the network is [reported](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs#L1288-L1292) using new `TxMemPool` [method](https://github.com/paritytech/polkadot-sdk/blob/8a53992e2fb200b084ebf0393ad22d91314fd173/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs#L605-L611). fixes: #7355, #7448 --------- Co-authored-by: cmd[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Sebastian Kunert <skunert49@gmail.com> Co-authored-by: Iulian Barbu <14218860+iulianbarbu@users.noreply.github.com> --- prdoc/pr_7505.prdoc | 14 + .../fork_aware_txpool/fork_aware_txpool.rs | 45 +- .../src/fork_aware_txpool/metrics.rs | 385 +++++++++++++++++- .../fork_aware_txpool/multi_view_listener.rs | 50 ++- .../src/fork_aware_txpool/tx_mem_pool.rs | 12 +- 5 files changed, 483 insertions(+), 23 deletions(-) create mode 100644 prdoc/pr_7505.prdoc diff --git a/prdoc/pr_7505.prdoc b/prdoc/pr_7505.prdoc new file mode 100644 index 00000000000..da9f44dcf40 --- /dev/null +++ b/prdoc/pr_7505.prdoc @@ -0,0 +1,14 @@ +title: '`fatxpool`: transaction statuses metrics added' +doc: +- audience: Node Dev + description: |- + This PR introduces a new mechanism to capture and report Prometheus metrics related to timings of transaction + lifecycle events, which are currently not available. By exposing these timings, we aim to augment transaction-pool + reliability dashboards and extend existing Grafana boards. + + A new `unknown_from_block_import_txs` metric is also introduced. It provides the number of transactions in imported + block which are not known to the node's transaction pool. It allows to monitor alignment of transaction pools + across the nodes in the network. +crates: +- name: sc-transaction-pool + bump: minor diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index ffe6c20d92b..6195cf53b60 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -21,7 +21,7 @@ use super::{ dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped}, import_notification_sink::MultiViewImportNotificationSink, - metrics::MetricsLink as PrometheusMetrics, + metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics}, multi_view_listener::MultiViewListener, tx_mem_pool::{InsertionInfo, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER}, view::View, @@ -143,6 +143,9 @@ where /// Prometheus's metrics endpoint. metrics: PrometheusMetrics, + /// Collector of transaction statuses updates, reports transaction events metrics. + events_metrics_collector: EventsMetricsCollector<ChainApi>, + /// Util tracking best and finalized block. enactment_state: Arc<Mutex<EnactmentState<Block>>>, @@ -193,7 +196,7 @@ where future_limits: crate::PoolLimit, mempool_max_transactions_count: usize, ) -> (Self, ForkAwareTxPoolTask) { - let (listener, listener_task) = MultiViewListener::new_with_worker(); + let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default()); let listener = Arc::new(listener); let (import_notification_sink, import_notification_sink_task) = @@ -246,6 +249,7 @@ where options, is_validator: false.into(), metrics: Default::default(), + events_metrics_collector: EventsMetricsCollector::default(), }, combined_tasks, ) @@ -314,8 +318,11 @@ where finalized_hash: Block::Hash, ) -> Self { let metrics = PrometheusMetrics::new(prometheus); + let (events_metrics_collector, event_metrics_task) = + EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone()); - let (listener, listener_task) = MultiViewListener::new_with_worker(); + let (listener, listener_task) = + MultiViewListener::new_with_worker(events_metrics_collector.clone()); let listener = Arc::new(listener); let (revalidation_queue, revalidation_task) = @@ -337,6 +344,7 @@ where let view_store = Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller)); + let dropped_monitor_task = Self::dropped_monitor_task( dropped_stream, mempool.clone(), @@ -350,6 +358,7 @@ where _ = revalidation_task => {}, _ = import_notification_sink_task => {}, _ = dropped_monitor_task => {} + _ = event_metrics_task => {}, } } .boxed(); @@ -368,6 +377,7 @@ where import_notification_sink, options, metrics, + events_metrics_collector, is_validator, } } @@ -721,7 +731,10 @@ where .iter() .zip(xts) .filter_map(|(result, xt)| { - result.as_ref().ok().map(|insertion| (insertion.source.clone(), xt)) + result.as_ref().ok().map(|insertion| { + self.events_metrics_collector.report_submitted(&insertion); + (insertion.source.clone(), xt) + }) }) .collect::<Vec<_>>(); @@ -812,21 +825,21 @@ where ); let xt = Arc::from(xt); - let InsertionInfo { hash: xt_hash, source: timed_source, .. } = - match self.mempool.push_watched(source, xt.clone()) { - Ok(result) => result, - Err(TxPoolApiError::ImmediatelyDropped) => - self.attempt_transaction_replacement(source, true, xt.clone()).await?, - Err(e) => return Err(e.into()), - }; + let insertion = match self.mempool.push_watched(source, xt.clone()) { + Ok(result) => result, + Err(TxPoolApiError::ImmediatelyDropped) => + self.attempt_transaction_replacement(source, true, xt.clone()).await?, + Err(e) => return Err(e.into()), + }; self.metrics.report(|metrics| metrics.submitted_transactions.inc()); + self.events_metrics_collector.report_submitted(&insertion); self.view_store - .submit_and_watch(at, timed_source, xt) + .submit_and_watch(at, insertion.source, xt) .await .inspect_err(|_| { - self.mempool.remove_transaction(&xt_hash); + self.mempool.remove_transaction(&insertion.hash); }) .map(|mut outcome| { self.mempool.update_transaction_priority(&outcome); @@ -1272,6 +1285,12 @@ where pruned_log.extend(enacted_log); }); + self.metrics.report(|metrics| { + metrics + .unknown_from_block_import_txs + .inc_by(self.mempool.count_unknown_transactions(pruned_log.iter()) as _) + }); + //resubmit { let mut resubmit_transactions = Vec::new(); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs index 73d45ac4305..c04741e1c1d 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs @@ -18,11 +18,26 @@ //! Prometheus's metrics for a fork-aware transaction pool. -use crate::common::metrics::{GenericMetricsLink, MetricsRegistrant}; +use super::tx_mem_pool::InsertionInfo; +use crate::{ + common::metrics::{GenericMetricsLink, MetricsRegistrant}, + graph::{self, BlockHash, ExtrinsicHash}, + LOG_TARGET, +}; +use futures::{FutureExt, StreamExt}; use prometheus_endpoint::{ - histogram_opts, linear_buckets, register, Counter, Gauge, Histogram, PrometheusError, Registry, - U64, + exponential_buckets, histogram_opts, linear_buckets, register, Counter, Gauge, Histogram, + PrometheusError, Registry, U64, +}; +use sc_transaction_pool_api::TransactionStatus; +use sc_utils::mpsc; +use std::{ + collections::{hash_map::Entry, HashMap}, + future::Future, + pin::Pin, + time::{Duration, Instant}, }; +use tracing::trace; /// A helper alias for the Prometheus's metrics endpoint. pub type MetricsLink = GenericMetricsLink<Metrics>; @@ -41,6 +56,8 @@ pub struct Metrics { pub unwatched_txs: Gauge<U64>, /// Total number of transactions reported as invalid. pub removed_invalid_txs: Counter<U64>, + /// Total number of transactions from imported blocks that are unknown to the pool. + pub unknown_from_block_import_txs: Counter<U64>, /// Total number of finalized transactions. pub finalized_txs: Counter<U64>, /// Histogram of maintain durations. @@ -59,6 +76,145 @@ pub struct Metrics { pub view_revalidation_duration: Histogram, /// Total number of the views created w/o cloning existing view. pub non_cloned_views: Counter<U64>, + /// Histograms to track the timing distribution of individual transaction pool events. + pub events_histograms: EventsHistograms, +} + +/// Represents a collection of histogram timings for different transaction statuses. +pub struct EventsHistograms { + /// Histogram of timings for reporting `TransactionStatus::Future` event + pub future: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Ready` event + pub ready: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Broadcast` event + pub broadcast: Histogram, + /// Histogram of timings for reporting `TransactionStatus::InBlock` event + pub in_block: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Retracted` event + pub retracted: Histogram, + /// Histogram of timings for reporting `TransactionStatus::FinalityTimeout` event + pub finality_timeout: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Finalized` event + pub finalized: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Usurped(Hash)` event + pub usurped: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Dropped` event + pub dropped: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Invalid` event + pub invalid: Histogram, +} + +impl EventsHistograms { + fn register(registry: &Registry) -> Result<Self, PrometheusError> { + Ok(Self { + future: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_future", + "Histogram of timings for reporting Future event", + exponential_buckets(0.01, 2.0, 16).unwrap() + ))?, + registry, + )?, + ready: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_ready", + "Histogram of timings for reporting Ready event", + exponential_buckets(0.01, 2.0, 16).unwrap() + ))?, + registry, + )?, + broadcast: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_broadcast", + "Histogram of timings for reporting Broadcast event", + linear_buckets(0.01, 0.25, 16).unwrap() + ))?, + registry, + )?, + in_block: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_in_block", + "Histogram of timings for reporting InBlock event", + linear_buckets(0.0, 3.0, 20).unwrap() + ))?, + registry, + )?, + retracted: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_retracted", + "Histogram of timings for reporting Retracted event", + linear_buckets(0.0, 3.0, 20).unwrap() + ))?, + registry, + )?, + finality_timeout: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_finality_timeout", + "Histogram of timings for reporting FinalityTimeout event", + linear_buckets(0.0, 40.0, 20).unwrap() + ))?, + registry, + )?, + finalized: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_finalized", + "Histogram of timings for reporting Finalized event", + linear_buckets(0.0, 40.0, 20).unwrap() + ))?, + registry, + )?, + usurped: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_usurped", + "Histogram of timings for reporting Usurped event", + linear_buckets(0.0, 3.0, 20).unwrap() + ))?, + registry, + )?, + dropped: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_dropped", + "Histogram of timings for reporting Dropped event", + linear_buckets(0.0, 3.0, 20).unwrap() + ))?, + registry, + )?, + invalid: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_invalid", + "Histogram of timings for reporting Invalid event", + linear_buckets(0.0, 3.0, 20).unwrap() + ))?, + registry, + )?, + }) + } + + /// Records the timing for a given transaction status. + /// + /// This method records the duration, representing the time elapsed since the + /// transaction was submitted until the event was reported. Based on the + /// transaction status, it utilizes the appropriate histogram to log this duration. + pub fn observe<Hash, BlockHash>( + &self, + status: TransactionStatus<Hash, BlockHash>, + duration: Duration, + ) { + let duration = duration.as_secs_f64(); + let histogram = match status { + TransactionStatus::Future => &self.future, + TransactionStatus::Ready => &self.ready, + TransactionStatus::Broadcast(..) => &self.broadcast, + TransactionStatus::InBlock(..) => &self.in_block, + TransactionStatus::Retracted(..) => &self.retracted, + TransactionStatus::FinalityTimeout(..) => &self.finality_timeout, + TransactionStatus::Finalized(..) => &self.finalized, + TransactionStatus::Usurped(..) => &self.usurped, + TransactionStatus::Dropped => &self.dropped, + TransactionStatus::Invalid => &self.invalid, + }; + histogram.observe(duration); + } } impl MetricsRegistrant for Metrics { @@ -106,6 +262,13 @@ impl MetricsRegistrant for Metrics { )?, registry, )?, + unknown_from_block_import_txs: register( + Counter::new( + "substrate_sub_txpool_unknown_from_block_import_txs_total", + "Total number of transactions from imported blocks that are unknown to the pool.", + )?, + registry, + )?, finalized_txs: register( Counter::new( "substrate_sub_txpool_finalized_txs_total", @@ -171,6 +334,222 @@ impl MetricsRegistrant for Metrics { )?, registry, )?, + events_histograms: EventsHistograms::register(registry)?, })) } } + +/// Messages used to report and compute event metrics. +enum EventMetricsMessage<Hash, BlockHash> { + /// Message indicating a transaction has been submitted, including the timestamp + /// and its hash. + Submitted(Instant, Hash), + /// Message indicating the new status of a transaction, including the timestamp and transaction + /// hash. + Status(Instant, Hash, TransactionStatus<Hash, BlockHash>), +} + +/// Collects metrics related to transaction events. +pub struct EventsMetricsCollector<ChainApi: graph::ChainApi> { + /// Optional channel for sending event metrics messages. + /// + /// If `None` no event metrics are collected (e.g. in tests). + metrics_message_sink: Option<MessageSink<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>>, +} + +impl<ChainApi: graph::ChainApi> Default for EventsMetricsCollector<ChainApi> { + fn default() -> Self { + Self { metrics_message_sink: None } + } +} + +impl<ChainApi: graph::ChainApi> Clone for EventsMetricsCollector<ChainApi> { + fn clone(&self) -> Self { + Self { metrics_message_sink: self.metrics_message_sink.clone() } + } +} + +impl<ChainApi: graph::ChainApi> EventsMetricsCollector<ChainApi> { + /// Reports the status of a transaction. + /// + /// Takes a transaction hash and status, and attempts to send a status + /// message to the metrics messages processing task. + pub fn report_status( + &self, + tx_hash: ExtrinsicHash<ChainApi>, + status: TransactionStatus<BlockHash<ChainApi>, ExtrinsicHash<ChainApi>>, + ) { + self.metrics_message_sink.as_ref().map(|sink| { + if let Err(error) = + sink.unbounded_send(EventMetricsMessage::Status(Instant::now(), tx_hash, status)) + { + trace!(target: LOG_TARGET, %error, "tx status metrics message send failed") + } + }); + } + + /// Reports that a transaction has been submitted. + /// + /// Takes a transaction hash and its submission timestamp, and attempts to + /// send a submission message to the metrics messages processing task. + pub fn report_submitted(&self, insertion_info: &InsertionInfo<ExtrinsicHash<ChainApi>>) { + self.metrics_message_sink.as_ref().map(|sink| { + if let Err(error) = sink.unbounded_send(EventMetricsMessage::Submitted( + insertion_info + .source + .timestamp + .expect("timestamp is set in fork-aware pool. qed"), + insertion_info.hash, + )) { + trace!(target: LOG_TARGET, %error, "tx status metrics message send failed") + } + }); + } +} + +/// A type alias for a asynchronous task that collects metrics related to events. +pub type EventsMetricsCollectorTask = Pin<Box<dyn Future<Output = ()> + Send>>; + +/// Sink type for sending event metrics messages. +type MessageSink<Hash, BlockHash> = + mpsc::TracingUnboundedSender<EventMetricsMessage<Hash, BlockHash>>; + +/// Receiver type for receiving event metrics messages. +type MessageReceiver<Hash, BlockHash> = + mpsc::TracingUnboundedReceiver<EventMetricsMessage<Hash, BlockHash>>; + +/// Holds data relevant to transaction event metrics, allowing de-duplication +/// of certain transaction statuses, and compute the timings of events. +struct TransactionEventMetricsData { + /// Flag indicating if the transaction was seen as `Ready`. + ready_seen: bool, + /// Flag indicating if the transaction was seen as `Broadcast`. + broadcast_seen: bool, + /// Flag indicating if the transaction was seen as `Future`. + future_seen: bool, + /// Flag indicating if the transaction was seen as `InBlock`. + in_block_seen: bool, + /// Flag indicating if the transaction was seen as `Retracted`. + retracted_seen: bool, + /// Timestamp when the transaction was submitted. + /// + /// Used to compute a time elapsed until events are reported. + submit_timestamp: Instant, +} + +impl TransactionEventMetricsData { + /// Creates a new `TransactionEventMetricsData` with the given timestamp. + fn new(submit_timestamp: Instant) -> Self { + Self { + submit_timestamp, + future_seen: false, + ready_seen: false, + broadcast_seen: false, + in_block_seen: false, + retracted_seen: false, + } + } + + /// Sets flag to true once. + /// + /// Return true if flag was toggled. + fn set_true_once(flag: &mut bool) -> bool { + if *flag { + false + } else { + *flag = true; + true + } + } + + /// Updates the status flags based on the given transaction status. + /// + /// Returns the submit timestamp if given status was not seen yet, `None` otherwise. + fn update<Hash, BlockHash>( + &mut self, + status: &TransactionStatus<Hash, BlockHash>, + ) -> Option<Instant> { + let flag = match *status { + TransactionStatus::Ready => &mut self.ready_seen, + TransactionStatus::Future => &mut self.future_seen, + TransactionStatus::Broadcast(..) => &mut self.broadcast_seen, + TransactionStatus::InBlock(..) => &mut self.in_block_seen, + TransactionStatus::Retracted(..) => &mut self.retracted_seen, + _ => return Some(self.submit_timestamp), + }; + Self::set_true_once(flag).then_some(self.submit_timestamp) + } +} + +impl<ChainApi> EventsMetricsCollector<ChainApi> +where + ChainApi: graph::ChainApi + 'static, +{ + /// Handles the status event. + /// + /// Updates the metrics by observing the time taken for a transaction's status update + /// from its submission time. + fn handle_status( + hash: ExtrinsicHash<ChainApi>, + status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + timestamp: Instant, + submitted_timestamp_map: &mut HashMap<ExtrinsicHash<ChainApi>, TransactionEventMetricsData>, + metrics: &MetricsLink, + ) { + let Entry::Occupied(mut entry) = submitted_timestamp_map.entry(hash) else { return }; + let remove = status.is_final(); + if let Some(submit_timestamp) = entry.get_mut().update(&status) { + metrics.report(|metrics| { + metrics + .events_histograms + .observe(status, timestamp.duration_since(submit_timestamp)) + }); + } + remove.then(|| entry.remove()); + } + + /// Asynchronous task to process received messages and compute relevant event metrics. + /// + /// Runs indefinitely, handling arriving messages and updating metrics + /// based on the recorded submission times and timestamps of current event statuses. + async fn task( + mut rx: MessageReceiver<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + metrics: MetricsLink, + ) { + let mut submitted_timestamp_map = + HashMap::<ExtrinsicHash<ChainApi>, TransactionEventMetricsData>::default(); + + loop { + match rx.next().await { + Some(EventMetricsMessage::Submitted(timestamp, hash)) => { + submitted_timestamp_map + .insert(hash, TransactionEventMetricsData::new(timestamp)); + }, + Some(EventMetricsMessage::Status(timestamp, hash, status)) => { + Self::handle_status( + hash, + status, + timestamp, + &mut submitted_timestamp_map, + &metrics, + ); + }, + None => { + return /* ? */ + }, + }; + } + } + + /// Constructs a new `EventsMetricsCollector` and its associated worker task. + /// + /// Returns the collector alongside an asynchronous task. The task shall be polled by caller. + pub fn new_with_worker(metrics: MetricsLink) -> (Self, EventsMetricsCollectorTask) { + const QUEUE_WARN_SIZE: usize = 100_000; + let (metrics_message_sink, rx) = + mpsc::tracing_unbounded("txpool-event-metrics-collector", QUEUE_WARN_SIZE); + let task = Self::task(rx, metrics); + + (Self { metrics_message_sink: Some(metrics_message_sink) }, task.boxed()) + } +} diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index 107c2941ec1..959df2ffe97 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -39,7 +39,10 @@ use std::{ use tokio_stream::StreamMap; use tracing::trace; -use super::dropped_watcher::{DroppedReason, DroppedTransaction}; +use super::{ + dropped_watcher::{DroppedReason, DroppedTransaction}, + metrics::EventsMetricsCollector, +}; /// A side channel allowing to control the external stream instance (one per transaction) with /// [`ControllerCommand`]. @@ -113,6 +116,26 @@ where } } +impl<ChainApi> Into<TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>> + for &TransactionStatusUpdate<ChainApi> +where + ChainApi: graph::ChainApi, +{ + fn into(self) -> TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>> { + match self { + TransactionStatusUpdate::TransactionInvalidated(_) => TransactionStatus::Invalid, + TransactionStatusUpdate::TransactionFinalized(_, hash, index) => + TransactionStatus::Finalized((*hash, *index)), + TransactionStatusUpdate::TransactionBroadcasted(_, peers) => + TransactionStatus::Broadcast(peers.clone()), + TransactionStatusUpdate::TransactionDropped(_, DroppedReason::Usurped(by)) => + TransactionStatus::Usurped(*by), + TransactionStatusUpdate::TransactionDropped(_, DroppedReason::LimitsEnforced) => + TransactionStatus::Dropped, + } + } +} + impl<ChainApi> std::fmt::Debug for TransactionStatusUpdate<ChainApi> where ChainApi: graph::ChainApi, @@ -451,12 +474,15 @@ where /// - transaction commands, /// to multiple individual per-transaction external watcher contexts. /// - /// The future shall be polled by instantiator of `MultiViewListener`. + /// It also reports transactions statuses updates to the provided `events_metrics_collector`. + /// + /// The returned future shall be polled by instantiator of `MultiViewListener`. async fn task( external_watchers_tx_hash_map: Arc< RwLock<HashMap<ExtrinsicHash<ChainApi>, Controller<ExternalWatcherCommand<ChainApi>>>>, >, mut command_receiver: CommandReceiver<ControllerCommand<ChainApi>>, + events_metrics_collector: EventsMetricsCollector<ChainApi>, ) { let mut aggregated_streams_map: StreamMap<BlockHash<ChainApi>, ViewStatusStream<ChainApi>> = Default::default(); @@ -465,6 +491,7 @@ where tokio::select! { biased; Some((view_hash, (tx_hash, status))) = next_event(&mut aggregated_streams_map) => { + events_metrics_collector.report_status(tx_hash, status.clone()); if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) { log::trace!( target: LOG_TARGET, @@ -510,6 +537,7 @@ where Some(ControllerCommand::TransactionStatusRequest(request)) => { let tx_hash = request.hash(); + events_metrics_collector.report_status(tx_hash, (&request).into()); if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) { if let Err(e) = ctrl .get_mut() @@ -529,12 +557,19 @@ where /// Creates a new [`MultiViewListener`] instance along with its associated worker task. /// - /// This function instansiates the new `MultiViewListener` and provides the worker task that + /// This function instantiates the new `MultiViewListener` and provides the worker task that /// relays messages to the external transactions listeners. The task shall be polled by caller. /// + /// The `events_metrics_collector` is an instance of `EventsMetricsCollector` that is + /// responsible for collecting and managing metrics related to transaction events. Newly + /// created instance of `MultiViewListener` will report transaction status updates and its + /// timestamps to the given metrics collector. + /// /// Returns a tuple containing the [`MultiViewListener`] and the /// [`MultiViewListenerTask`]. - pub fn new_with_worker() -> (Self, MultiViewListenerTask) { + pub fn new_with_worker( + events_metrics_collector: EventsMetricsCollector<ChainApi>, + ) -> (Self, MultiViewListenerTask) { let external_controllers = Arc::from(RwLock::from(HashMap::< ExtrinsicHash<ChainApi>, Controller<ExternalWatcherCommand<ChainApi>>, @@ -545,7 +580,7 @@ where "txpool-multi-view-listener-task-controller", CONTROLLER_QUEUE_WARN_SIZE, ); - let task = Self::task(external_controllers.clone(), rx); + let task = Self::task(external_controllers.clone(), rx, events_metrics_collector); (Self { external_controllers, controller: tx }, task.boxed()) } @@ -557,6 +592,9 @@ where /// (meaning that it can be exposed to [`sc_transaction_pool_api::TransactionPool`] API client /// e.g. rpc) stream of transaction status events. If an external watcher is already present for /// the given transaction, it returns `None`. + /// + /// The `submit_timestamp` indicates the time at which a transaction is submitted. + /// It is primarily used to calculate event timings for metric collection. pub(crate) fn create_external_watcher_for_tx( &self, tx_hash: ExtrinsicHash<ChainApi>, @@ -779,7 +817,7 @@ mod tests { fn create_multi_view_listener( ) -> (MultiViewListener, tokio::sync::oneshot::Sender<()>, JoinHandle<()>) { - let (listener, listener_task) = MultiViewListener::new_with_worker(); + let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default()); let (tx, rx) = tokio::sync::oneshot::channel(); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index e141016ccb2..d64d80d4343 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -276,7 +276,7 @@ where ) -> Self { Self { api, - listener: Arc::from(MultiViewListener::new_with_worker().0), + listener: Arc::from(MultiViewListener::new_with_worker(Default::default()).0), transactions: Default::default(), metrics: Default::default(), max_transactions_count, @@ -599,6 +599,16 @@ where .map(|p| *p.priority.write() = Some(priority)) }); } + + /// Counts the number of transactions in the provided iterator of hashes + /// that are not known to the pool. + pub(super) fn count_unknown_transactions<'a>( + &self, + hashes: impl Iterator<Item = &'a ExtrinsicHash<ChainApi>>, + ) -> usize { + let transactions = self.transactions.read(); + hashes.filter(|tx_hash| !transactions.contains_key(tx_hash)).count() + } } #[cfg(test)] -- GitLab