diff --git a/prdoc/pr_7505.prdoc b/prdoc/pr_7505.prdoc new file mode 100644 index 0000000000000000000000000000000000000000..da9f44dcf4060a4f79f4546cee7e36a63325e52c --- /dev/null +++ b/prdoc/pr_7505.prdoc @@ -0,0 +1,14 @@ +title: '`fatxpool`: transaction statuses metrics added' +doc: +- audience: Node Dev + description: |- + This PR introduces a new mechanism to capture and report Prometheus metrics related to timings of transaction + lifecycle events, which are currently not available. By exposing these timings, we aim to augment transaction-pool + reliability dashboards and extend existing Grafana boards. + + A new `unknown_from_block_import_txs` metric is also introduced. It provides the number of transactions in imported + block which are not known to the node's transaction pool. It allows to monitor alignment of transaction pools + across the nodes in the network. +crates: +- name: sc-transaction-pool + bump: minor diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index ffe6c20d92b72d2c3aa5c444224f6c32f83aad8e..6195cf53b6072aec2ab95fa6dc4138fdbe8bbefe 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -21,7 +21,7 @@ use super::{ dropped_watcher::{MultiViewDroppedWatcherController, StreamOfDropped}, import_notification_sink::MultiViewImportNotificationSink, - metrics::MetricsLink as PrometheusMetrics, + metrics::{EventsMetricsCollector, MetricsLink as PrometheusMetrics}, multi_view_listener::MultiViewListener, tx_mem_pool::{InsertionInfo, TxMemPool, TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER}, view::View, @@ -143,6 +143,9 @@ where /// Prometheus's metrics endpoint. metrics: PrometheusMetrics, + /// Collector of transaction statuses updates, reports transaction events metrics. + events_metrics_collector: EventsMetricsCollector<ChainApi>, + /// Util tracking best and finalized block. enactment_state: Arc<Mutex<EnactmentState<Block>>>, @@ -193,7 +196,7 @@ where future_limits: crate::PoolLimit, mempool_max_transactions_count: usize, ) -> (Self, ForkAwareTxPoolTask) { - let (listener, listener_task) = MultiViewListener::new_with_worker(); + let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default()); let listener = Arc::new(listener); let (import_notification_sink, import_notification_sink_task) = @@ -246,6 +249,7 @@ where options, is_validator: false.into(), metrics: Default::default(), + events_metrics_collector: EventsMetricsCollector::default(), }, combined_tasks, ) @@ -314,8 +318,11 @@ where finalized_hash: Block::Hash, ) -> Self { let metrics = PrometheusMetrics::new(prometheus); + let (events_metrics_collector, event_metrics_task) = + EventsMetricsCollector::<ChainApi>::new_with_worker(metrics.clone()); - let (listener, listener_task) = MultiViewListener::new_with_worker(); + let (listener, listener_task) = + MultiViewListener::new_with_worker(events_metrics_collector.clone()); let listener = Arc::new(listener); let (revalidation_queue, revalidation_task) = @@ -337,6 +344,7 @@ where let view_store = Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller)); + let dropped_monitor_task = Self::dropped_monitor_task( dropped_stream, mempool.clone(), @@ -350,6 +358,7 @@ where _ = revalidation_task => {}, _ = import_notification_sink_task => {}, _ = dropped_monitor_task => {} + _ = event_metrics_task => {}, } } .boxed(); @@ -368,6 +377,7 @@ where import_notification_sink, options, metrics, + events_metrics_collector, is_validator, } } @@ -721,7 +731,10 @@ where .iter() .zip(xts) .filter_map(|(result, xt)| { - result.as_ref().ok().map(|insertion| (insertion.source.clone(), xt)) + result.as_ref().ok().map(|insertion| { + self.events_metrics_collector.report_submitted(&insertion); + (insertion.source.clone(), xt) + }) }) .collect::<Vec<_>>(); @@ -812,21 +825,21 @@ where ); let xt = Arc::from(xt); - let InsertionInfo { hash: xt_hash, source: timed_source, .. } = - match self.mempool.push_watched(source, xt.clone()) { - Ok(result) => result, - Err(TxPoolApiError::ImmediatelyDropped) => - self.attempt_transaction_replacement(source, true, xt.clone()).await?, - Err(e) => return Err(e.into()), - }; + let insertion = match self.mempool.push_watched(source, xt.clone()) { + Ok(result) => result, + Err(TxPoolApiError::ImmediatelyDropped) => + self.attempt_transaction_replacement(source, true, xt.clone()).await?, + Err(e) => return Err(e.into()), + }; self.metrics.report(|metrics| metrics.submitted_transactions.inc()); + self.events_metrics_collector.report_submitted(&insertion); self.view_store - .submit_and_watch(at, timed_source, xt) + .submit_and_watch(at, insertion.source, xt) .await .inspect_err(|_| { - self.mempool.remove_transaction(&xt_hash); + self.mempool.remove_transaction(&insertion.hash); }) .map(|mut outcome| { self.mempool.update_transaction_priority(&outcome); @@ -1272,6 +1285,12 @@ where pruned_log.extend(enacted_log); }); + self.metrics.report(|metrics| { + metrics + .unknown_from_block_import_txs + .inc_by(self.mempool.count_unknown_transactions(pruned_log.iter()) as _) + }); + //resubmit { let mut resubmit_transactions = Vec::new(); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs index 73d45ac430519aed4a4afac048d15a3c27e5e930..c04741e1c1d9e2645e4bc5c3c2979835f78d1c6e 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs @@ -18,11 +18,26 @@ //! Prometheus's metrics for a fork-aware transaction pool. -use crate::common::metrics::{GenericMetricsLink, MetricsRegistrant}; +use super::tx_mem_pool::InsertionInfo; +use crate::{ + common::metrics::{GenericMetricsLink, MetricsRegistrant}, + graph::{self, BlockHash, ExtrinsicHash}, + LOG_TARGET, +}; +use futures::{FutureExt, StreamExt}; use prometheus_endpoint::{ - histogram_opts, linear_buckets, register, Counter, Gauge, Histogram, PrometheusError, Registry, - U64, + exponential_buckets, histogram_opts, linear_buckets, register, Counter, Gauge, Histogram, + PrometheusError, Registry, U64, +}; +use sc_transaction_pool_api::TransactionStatus; +use sc_utils::mpsc; +use std::{ + collections::{hash_map::Entry, HashMap}, + future::Future, + pin::Pin, + time::{Duration, Instant}, }; +use tracing::trace; /// A helper alias for the Prometheus's metrics endpoint. pub type MetricsLink = GenericMetricsLink<Metrics>; @@ -41,6 +56,8 @@ pub struct Metrics { pub unwatched_txs: Gauge<U64>, /// Total number of transactions reported as invalid. pub removed_invalid_txs: Counter<U64>, + /// Total number of transactions from imported blocks that are unknown to the pool. + pub unknown_from_block_import_txs: Counter<U64>, /// Total number of finalized transactions. pub finalized_txs: Counter<U64>, /// Histogram of maintain durations. @@ -59,6 +76,145 @@ pub struct Metrics { pub view_revalidation_duration: Histogram, /// Total number of the views created w/o cloning existing view. pub non_cloned_views: Counter<U64>, + /// Histograms to track the timing distribution of individual transaction pool events. + pub events_histograms: EventsHistograms, +} + +/// Represents a collection of histogram timings for different transaction statuses. +pub struct EventsHistograms { + /// Histogram of timings for reporting `TransactionStatus::Future` event + pub future: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Ready` event + pub ready: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Broadcast` event + pub broadcast: Histogram, + /// Histogram of timings for reporting `TransactionStatus::InBlock` event + pub in_block: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Retracted` event + pub retracted: Histogram, + /// Histogram of timings for reporting `TransactionStatus::FinalityTimeout` event + pub finality_timeout: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Finalized` event + pub finalized: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Usurped(Hash)` event + pub usurped: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Dropped` event + pub dropped: Histogram, + /// Histogram of timings for reporting `TransactionStatus::Invalid` event + pub invalid: Histogram, +} + +impl EventsHistograms { + fn register(registry: &Registry) -> Result<Self, PrometheusError> { + Ok(Self { + future: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_future", + "Histogram of timings for reporting Future event", + exponential_buckets(0.01, 2.0, 16).unwrap() + ))?, + registry, + )?, + ready: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_ready", + "Histogram of timings for reporting Ready event", + exponential_buckets(0.01, 2.0, 16).unwrap() + ))?, + registry, + )?, + broadcast: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_broadcast", + "Histogram of timings for reporting Broadcast event", + linear_buckets(0.01, 0.25, 16).unwrap() + ))?, + registry, + )?, + in_block: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_in_block", + "Histogram of timings for reporting InBlock event", + linear_buckets(0.0, 3.0, 20).unwrap() + ))?, + registry, + )?, + retracted: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_retracted", + "Histogram of timings for reporting Retracted event", + linear_buckets(0.0, 3.0, 20).unwrap() + ))?, + registry, + )?, + finality_timeout: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_finality_timeout", + "Histogram of timings for reporting FinalityTimeout event", + linear_buckets(0.0, 40.0, 20).unwrap() + ))?, + registry, + )?, + finalized: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_finalized", + "Histogram of timings for reporting Finalized event", + linear_buckets(0.0, 40.0, 20).unwrap() + ))?, + registry, + )?, + usurped: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_usurped", + "Histogram of timings for reporting Usurped event", + linear_buckets(0.0, 3.0, 20).unwrap() + ))?, + registry, + )?, + dropped: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_dropped", + "Histogram of timings for reporting Dropped event", + linear_buckets(0.0, 3.0, 20).unwrap() + ))?, + registry, + )?, + invalid: register( + Histogram::with_opts(histogram_opts!( + "substrate_sub_txpool_timing_event_invalid", + "Histogram of timings for reporting Invalid event", + linear_buckets(0.0, 3.0, 20).unwrap() + ))?, + registry, + )?, + }) + } + + /// Records the timing for a given transaction status. + /// + /// This method records the duration, representing the time elapsed since the + /// transaction was submitted until the event was reported. Based on the + /// transaction status, it utilizes the appropriate histogram to log this duration. + pub fn observe<Hash, BlockHash>( + &self, + status: TransactionStatus<Hash, BlockHash>, + duration: Duration, + ) { + let duration = duration.as_secs_f64(); + let histogram = match status { + TransactionStatus::Future => &self.future, + TransactionStatus::Ready => &self.ready, + TransactionStatus::Broadcast(..) => &self.broadcast, + TransactionStatus::InBlock(..) => &self.in_block, + TransactionStatus::Retracted(..) => &self.retracted, + TransactionStatus::FinalityTimeout(..) => &self.finality_timeout, + TransactionStatus::Finalized(..) => &self.finalized, + TransactionStatus::Usurped(..) => &self.usurped, + TransactionStatus::Dropped => &self.dropped, + TransactionStatus::Invalid => &self.invalid, + }; + histogram.observe(duration); + } } impl MetricsRegistrant for Metrics { @@ -106,6 +262,13 @@ impl MetricsRegistrant for Metrics { )?, registry, )?, + unknown_from_block_import_txs: register( + Counter::new( + "substrate_sub_txpool_unknown_from_block_import_txs_total", + "Total number of transactions from imported blocks that are unknown to the pool.", + )?, + registry, + )?, finalized_txs: register( Counter::new( "substrate_sub_txpool_finalized_txs_total", @@ -171,6 +334,222 @@ impl MetricsRegistrant for Metrics { )?, registry, )?, + events_histograms: EventsHistograms::register(registry)?, })) } } + +/// Messages used to report and compute event metrics. +enum EventMetricsMessage<Hash, BlockHash> { + /// Message indicating a transaction has been submitted, including the timestamp + /// and its hash. + Submitted(Instant, Hash), + /// Message indicating the new status of a transaction, including the timestamp and transaction + /// hash. + Status(Instant, Hash, TransactionStatus<Hash, BlockHash>), +} + +/// Collects metrics related to transaction events. +pub struct EventsMetricsCollector<ChainApi: graph::ChainApi> { + /// Optional channel for sending event metrics messages. + /// + /// If `None` no event metrics are collected (e.g. in tests). + metrics_message_sink: Option<MessageSink<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>>, +} + +impl<ChainApi: graph::ChainApi> Default for EventsMetricsCollector<ChainApi> { + fn default() -> Self { + Self { metrics_message_sink: None } + } +} + +impl<ChainApi: graph::ChainApi> Clone for EventsMetricsCollector<ChainApi> { + fn clone(&self) -> Self { + Self { metrics_message_sink: self.metrics_message_sink.clone() } + } +} + +impl<ChainApi: graph::ChainApi> EventsMetricsCollector<ChainApi> { + /// Reports the status of a transaction. + /// + /// Takes a transaction hash and status, and attempts to send a status + /// message to the metrics messages processing task. + pub fn report_status( + &self, + tx_hash: ExtrinsicHash<ChainApi>, + status: TransactionStatus<BlockHash<ChainApi>, ExtrinsicHash<ChainApi>>, + ) { + self.metrics_message_sink.as_ref().map(|sink| { + if let Err(error) = + sink.unbounded_send(EventMetricsMessage::Status(Instant::now(), tx_hash, status)) + { + trace!(target: LOG_TARGET, %error, "tx status metrics message send failed") + } + }); + } + + /// Reports that a transaction has been submitted. + /// + /// Takes a transaction hash and its submission timestamp, and attempts to + /// send a submission message to the metrics messages processing task. + pub fn report_submitted(&self, insertion_info: &InsertionInfo<ExtrinsicHash<ChainApi>>) { + self.metrics_message_sink.as_ref().map(|sink| { + if let Err(error) = sink.unbounded_send(EventMetricsMessage::Submitted( + insertion_info + .source + .timestamp + .expect("timestamp is set in fork-aware pool. qed"), + insertion_info.hash, + )) { + trace!(target: LOG_TARGET, %error, "tx status metrics message send failed") + } + }); + } +} + +/// A type alias for a asynchronous task that collects metrics related to events. +pub type EventsMetricsCollectorTask = Pin<Box<dyn Future<Output = ()> + Send>>; + +/// Sink type for sending event metrics messages. +type MessageSink<Hash, BlockHash> = + mpsc::TracingUnboundedSender<EventMetricsMessage<Hash, BlockHash>>; + +/// Receiver type for receiving event metrics messages. +type MessageReceiver<Hash, BlockHash> = + mpsc::TracingUnboundedReceiver<EventMetricsMessage<Hash, BlockHash>>; + +/// Holds data relevant to transaction event metrics, allowing de-duplication +/// of certain transaction statuses, and compute the timings of events. +struct TransactionEventMetricsData { + /// Flag indicating if the transaction was seen as `Ready`. + ready_seen: bool, + /// Flag indicating if the transaction was seen as `Broadcast`. + broadcast_seen: bool, + /// Flag indicating if the transaction was seen as `Future`. + future_seen: bool, + /// Flag indicating if the transaction was seen as `InBlock`. + in_block_seen: bool, + /// Flag indicating if the transaction was seen as `Retracted`. + retracted_seen: bool, + /// Timestamp when the transaction was submitted. + /// + /// Used to compute a time elapsed until events are reported. + submit_timestamp: Instant, +} + +impl TransactionEventMetricsData { + /// Creates a new `TransactionEventMetricsData` with the given timestamp. + fn new(submit_timestamp: Instant) -> Self { + Self { + submit_timestamp, + future_seen: false, + ready_seen: false, + broadcast_seen: false, + in_block_seen: false, + retracted_seen: false, + } + } + + /// Sets flag to true once. + /// + /// Return true if flag was toggled. + fn set_true_once(flag: &mut bool) -> bool { + if *flag { + false + } else { + *flag = true; + true + } + } + + /// Updates the status flags based on the given transaction status. + /// + /// Returns the submit timestamp if given status was not seen yet, `None` otherwise. + fn update<Hash, BlockHash>( + &mut self, + status: &TransactionStatus<Hash, BlockHash>, + ) -> Option<Instant> { + let flag = match *status { + TransactionStatus::Ready => &mut self.ready_seen, + TransactionStatus::Future => &mut self.future_seen, + TransactionStatus::Broadcast(..) => &mut self.broadcast_seen, + TransactionStatus::InBlock(..) => &mut self.in_block_seen, + TransactionStatus::Retracted(..) => &mut self.retracted_seen, + _ => return Some(self.submit_timestamp), + }; + Self::set_true_once(flag).then_some(self.submit_timestamp) + } +} + +impl<ChainApi> EventsMetricsCollector<ChainApi> +where + ChainApi: graph::ChainApi + 'static, +{ + /// Handles the status event. + /// + /// Updates the metrics by observing the time taken for a transaction's status update + /// from its submission time. + fn handle_status( + hash: ExtrinsicHash<ChainApi>, + status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + timestamp: Instant, + submitted_timestamp_map: &mut HashMap<ExtrinsicHash<ChainApi>, TransactionEventMetricsData>, + metrics: &MetricsLink, + ) { + let Entry::Occupied(mut entry) = submitted_timestamp_map.entry(hash) else { return }; + let remove = status.is_final(); + if let Some(submit_timestamp) = entry.get_mut().update(&status) { + metrics.report(|metrics| { + metrics + .events_histograms + .observe(status, timestamp.duration_since(submit_timestamp)) + }); + } + remove.then(|| entry.remove()); + } + + /// Asynchronous task to process received messages and compute relevant event metrics. + /// + /// Runs indefinitely, handling arriving messages and updating metrics + /// based on the recorded submission times and timestamps of current event statuses. + async fn task( + mut rx: MessageReceiver<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + metrics: MetricsLink, + ) { + let mut submitted_timestamp_map = + HashMap::<ExtrinsicHash<ChainApi>, TransactionEventMetricsData>::default(); + + loop { + match rx.next().await { + Some(EventMetricsMessage::Submitted(timestamp, hash)) => { + submitted_timestamp_map + .insert(hash, TransactionEventMetricsData::new(timestamp)); + }, + Some(EventMetricsMessage::Status(timestamp, hash, status)) => { + Self::handle_status( + hash, + status, + timestamp, + &mut submitted_timestamp_map, + &metrics, + ); + }, + None => { + return /* ? */ + }, + }; + } + } + + /// Constructs a new `EventsMetricsCollector` and its associated worker task. + /// + /// Returns the collector alongside an asynchronous task. The task shall be polled by caller. + pub fn new_with_worker(metrics: MetricsLink) -> (Self, EventsMetricsCollectorTask) { + const QUEUE_WARN_SIZE: usize = 100_000; + let (metrics_message_sink, rx) = + mpsc::tracing_unbounded("txpool-event-metrics-collector", QUEUE_WARN_SIZE); + let task = Self::task(rx, metrics); + + (Self { metrics_message_sink: Some(metrics_message_sink) }, task.boxed()) + } +} diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index 107c2941ec183470e91efb28d9d46e6156713aa0..959df2ffe978471711387be3d671f06aa79d43e0 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -39,7 +39,10 @@ use std::{ use tokio_stream::StreamMap; use tracing::trace; -use super::dropped_watcher::{DroppedReason, DroppedTransaction}; +use super::{ + dropped_watcher::{DroppedReason, DroppedTransaction}, + metrics::EventsMetricsCollector, +}; /// A side channel allowing to control the external stream instance (one per transaction) with /// [`ControllerCommand`]. @@ -113,6 +116,26 @@ where } } +impl<ChainApi> Into<TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>> + for &TransactionStatusUpdate<ChainApi> +where + ChainApi: graph::ChainApi, +{ + fn into(self) -> TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>> { + match self { + TransactionStatusUpdate::TransactionInvalidated(_) => TransactionStatus::Invalid, + TransactionStatusUpdate::TransactionFinalized(_, hash, index) => + TransactionStatus::Finalized((*hash, *index)), + TransactionStatusUpdate::TransactionBroadcasted(_, peers) => + TransactionStatus::Broadcast(peers.clone()), + TransactionStatusUpdate::TransactionDropped(_, DroppedReason::Usurped(by)) => + TransactionStatus::Usurped(*by), + TransactionStatusUpdate::TransactionDropped(_, DroppedReason::LimitsEnforced) => + TransactionStatus::Dropped, + } + } +} + impl<ChainApi> std::fmt::Debug for TransactionStatusUpdate<ChainApi> where ChainApi: graph::ChainApi, @@ -451,12 +474,15 @@ where /// - transaction commands, /// to multiple individual per-transaction external watcher contexts. /// - /// The future shall be polled by instantiator of `MultiViewListener`. + /// It also reports transactions statuses updates to the provided `events_metrics_collector`. + /// + /// The returned future shall be polled by instantiator of `MultiViewListener`. async fn task( external_watchers_tx_hash_map: Arc< RwLock<HashMap<ExtrinsicHash<ChainApi>, Controller<ExternalWatcherCommand<ChainApi>>>>, >, mut command_receiver: CommandReceiver<ControllerCommand<ChainApi>>, + events_metrics_collector: EventsMetricsCollector<ChainApi>, ) { let mut aggregated_streams_map: StreamMap<BlockHash<ChainApi>, ViewStatusStream<ChainApi>> = Default::default(); @@ -465,6 +491,7 @@ where tokio::select! { biased; Some((view_hash, (tx_hash, status))) = next_event(&mut aggregated_streams_map) => { + events_metrics_collector.report_status(tx_hash, status.clone()); if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) { log::trace!( target: LOG_TARGET, @@ -510,6 +537,7 @@ where Some(ControllerCommand::TransactionStatusRequest(request)) => { let tx_hash = request.hash(); + events_metrics_collector.report_status(tx_hash, (&request).into()); if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) { if let Err(e) = ctrl .get_mut() @@ -529,12 +557,19 @@ where /// Creates a new [`MultiViewListener`] instance along with its associated worker task. /// - /// This function instansiates the new `MultiViewListener` and provides the worker task that + /// This function instantiates the new `MultiViewListener` and provides the worker task that /// relays messages to the external transactions listeners. The task shall be polled by caller. /// + /// The `events_metrics_collector` is an instance of `EventsMetricsCollector` that is + /// responsible for collecting and managing metrics related to transaction events. Newly + /// created instance of `MultiViewListener` will report transaction status updates and its + /// timestamps to the given metrics collector. + /// /// Returns a tuple containing the [`MultiViewListener`] and the /// [`MultiViewListenerTask`]. - pub fn new_with_worker() -> (Self, MultiViewListenerTask) { + pub fn new_with_worker( + events_metrics_collector: EventsMetricsCollector<ChainApi>, + ) -> (Self, MultiViewListenerTask) { let external_controllers = Arc::from(RwLock::from(HashMap::< ExtrinsicHash<ChainApi>, Controller<ExternalWatcherCommand<ChainApi>>, @@ -545,7 +580,7 @@ where "txpool-multi-view-listener-task-controller", CONTROLLER_QUEUE_WARN_SIZE, ); - let task = Self::task(external_controllers.clone(), rx); + let task = Self::task(external_controllers.clone(), rx, events_metrics_collector); (Self { external_controllers, controller: tx }, task.boxed()) } @@ -557,6 +592,9 @@ where /// (meaning that it can be exposed to [`sc_transaction_pool_api::TransactionPool`] API client /// e.g. rpc) stream of transaction status events. If an external watcher is already present for /// the given transaction, it returns `None`. + /// + /// The `submit_timestamp` indicates the time at which a transaction is submitted. + /// It is primarily used to calculate event timings for metric collection. pub(crate) fn create_external_watcher_for_tx( &self, tx_hash: ExtrinsicHash<ChainApi>, @@ -779,7 +817,7 @@ mod tests { fn create_multi_view_listener( ) -> (MultiViewListener, tokio::sync::oneshot::Sender<()>, JoinHandle<()>) { - let (listener, listener_task) = MultiViewListener::new_with_worker(); + let (listener, listener_task) = MultiViewListener::new_with_worker(Default::default()); let (tx, rx) = tokio::sync::oneshot::channel(); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index e141016ccb28b39a1be816478c0f6da748d875ce..d64d80d434308647c9343c01f78ca8e15079da3a 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -276,7 +276,7 @@ where ) -> Self { Self { api, - listener: Arc::from(MultiViewListener::new_with_worker().0), + listener: Arc::from(MultiViewListener::new_with_worker(Default::default()).0), transactions: Default::default(), metrics: Default::default(), max_transactions_count, @@ -599,6 +599,16 @@ where .map(|p| *p.priority.write() = Some(priority)) }); } + + /// Counts the number of transactions in the provided iterator of hashes + /// that are not known to the pool. + pub(super) fn count_unknown_transactions<'a>( + &self, + hashes: impl Iterator<Item = &'a ExtrinsicHash<ChainApi>>, + ) -> usize { + let transactions = self.transactions.read(); + hashes.filter(|tx_hash| !transactions.contains_key(tx_hash)).count() + } } #[cfg(test)]