diff --git a/prdoc/pr_7545.prdoc b/prdoc/pr_7545.prdoc new file mode 100644 index 0000000000000000000000000000000000000000..6956b09947fca7a258acd21388fc6ab59a139852 --- /dev/null +++ b/prdoc/pr_7545.prdoc @@ -0,0 +1,9 @@ +title: '`fatxpool`: event streams moved to view domain' +doc: +- audience: Node Dev + description: |- + This pull request refactors the transaction pool `graph` module by renaming components for better clarity and decouples `graph` module from `view` module related specifics. + This PR does not introduce changes in the logic. +crates: +- name: sc-transaction-pool + bump: minor diff --git a/substrate/client/transaction-pool/benches/basics.rs b/substrate/client/transaction-pool/benches/basics.rs index 5ba9dd40c15680ee2c5c83803f8bc62b59e96c57..74dd69a8aaf8d560b862aaa0df44eae4a57adc5f 100644 --- a/substrate/client/transaction-pool/benches/basics.rs +++ b/substrate/client/transaction-pool/benches/basics.rs @@ -151,7 +151,7 @@ fn uxt(transfer: TransferData) -> Extrinsic { ExtrinsicBuilder::new_bench_call(transfer).build() } -fn bench_configured(pool: Pool<TestApi>, number: u64, api: Arc<TestApi>) { +fn bench_configured(pool: Pool<TestApi, ()>, number: u64, api: Arc<TestApi>) { let source = TimedTransactionSource::new_external(false); let mut futures = Vec::new(); let mut tags = Vec::new(); diff --git a/substrate/client/transaction-pool/src/common/tests.rs b/substrate/client/transaction-pool/src/common/tests.rs index 7f2cbe24d8ef62bae4ccb4ffda8d974d92c38c97..c391beb21b07fcd6dd82ded7015529df503c78ae 100644 --- a/substrate/client/transaction-pool/src/common/tests.rs +++ b/substrate/client/transaction-pool/src/common/tests.rs @@ -18,7 +18,7 @@ //! Testing related primitives for internal usage in this crate. -use crate::graph::{BlockHash, ChainApi, ExtrinsicFor, NumberFor, Pool, RawExtrinsicFor}; +use crate::graph::{BlockHash, ChainApi, ExtrinsicFor, NumberFor, RawExtrinsicFor}; use codec::Encode; use parking_lot::Mutex; use sc_transaction_pool_api::error; @@ -36,6 +36,8 @@ use substrate_test_runtime::{ ExtrinsicBuilder, Hashing, RuntimeCall, Transfer, TransferData, H256, }; +type Pool<Api> = crate::graph::Pool<Api, ()>; + pub(crate) const INVALID_NONCE: u64 = 254; /// Test api that implements [`ChainApi`]. diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs index e04c826a1d522aa55e9e8a9e9ca2810ab3da65ba..91237910adc1f59f3129f2ce73c7cd7ad021716a 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs @@ -81,7 +81,8 @@ pub enum DroppedReason<Hash> { } /// Dropped-logic related event from the single view. -pub type ViewStreamEvent<C> = crate::graph::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>; +pub type ViewStreamEvent<C> = + crate::fork_aware_txpool::view::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>; /// Dropped-logic stream of events coming from the single view. type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>; diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 5b43d900848abecacdd599806eb9d9cb24bf042f..c21e0b8df6ff54afd5a55ee77ca5629d2d7a1b45 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -478,7 +478,7 @@ where }; if let Ok((Some(best_tree_route), Some(best_view))) = best_result { - let tmp_view: View<ChainApi> = + let (tmp_view, _, _): (View<ChainApi>, _, _) = View::new_from_other(&best_view, &HashAndNumber { hash: at, number: block_number }); let mut all_extrinsics = vec![]; @@ -1085,26 +1085,28 @@ where ?tree_route, "build_new_view" ); - let mut view = if let Some(origin_view) = origin_view { - let mut view = View::new_from_other(&origin_view, at); - if !tree_route.retracted().is_empty() { - view.pool.clear_recently_pruned(); - } - view - } else { - debug!( - target: LOG_TARGET, - ?at, - "creating non-cloned view" - ); - View::new( - self.api.clone(), - at.clone(), - self.options.clone(), - self.metrics.clone(), - self.is_validator.clone(), - ) - }; + let (mut view, view_dropped_stream, view_aggregated_stream) = + if let Some(origin_view) = origin_view { + let (mut view, view_dropped_stream, view_aggragated_stream) = + View::new_from_other(&origin_view, at); + if !tree_route.retracted().is_empty() { + view.pool.clear_recently_pruned(); + } + (view, view_dropped_stream, view_aggragated_stream) + } else { + debug!( + target: LOG_TARGET, + ?at, + "creating non-cloned view" + ); + View::new( + self.api.clone(), + at.clone(), + self.options.clone(), + self.metrics.clone(), + self.is_validator.clone(), + ) + }; let start = Instant::now(); // 1. Capture all import notification from the very beginning, so first register all @@ -1114,15 +1116,13 @@ where view.pool.validated_pool().import_notification_stream().boxed(), ); - self.view_store.dropped_stream_controller.add_view( - view.at.hash, - view.pool.validated_pool().create_dropped_by_limits_stream().boxed(), - ); + self.view_store + .dropped_stream_controller + .add_view(view.at.hash, view_dropped_stream.boxed()); - self.view_store.listener.add_view_aggregated_stream( - view.at.hash, - view.pool.validated_pool().create_aggregated_stream().boxed(), - ); + self.view_store + .listener + .add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed()); // sync the transactions statuses and referencing views in all the listeners with newly // cloned view. view.pool.validated_pool().retrigger_notifications(); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs index 2c4da0182a2524431077ed3284b406bc6fc8869c..fce2d4ad6b27e05ae75bd9345b649576db4714fe 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs @@ -323,7 +323,7 @@ //! [`MultiViewListener`]: crate::fork_aware_txpool::multi_view_listener::MultiViewListener //! [`Pool`]: crate::graph::Pool //! [`Watcher`]: crate::graph::watcher::Watcher -//! [`AggregatedStream`]: crate::graph::AggregatedStream +//! [`AggregatedStream`]: crate::fork_aware_txpool::view::AggregatedStream //! [`Options`]: crate::graph::Options //! [`vp::import_notification_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.import_notification_stream //! [`vp::enforce_limits`]: ../graph/validated_pool/struct.ValidatedPool.html#method.enforce_limits diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index 62c4320e5d3531abe22c531b90fa99dfcb30d585..5216f494ffa55e22aeffaeaee50985b08407e340 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -22,8 +22,8 @@ use crate::{ common::tracing_log_xt::log_xt_trace, - fork_aware_txpool::stream_map_util::next_event, - graph::{self, BlockHash, ExtrinsicHash, TransactionStatusEvent}, + fork_aware_txpool::{stream_map_util::next_event, view::TransactionStatusEvent}, + graph::{self, BlockHash, ExtrinsicHash}, LOG_TARGET, }; use futures::{Future, FutureExt, Stream, StreamExt}; diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs index 2f3d31d0e6fde7484a9fbbabfa3a54aa43ec0448..24f71982c7816e12e7395cd7d00216bc90c02847 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs @@ -211,13 +211,9 @@ mod tests { let api = Arc::new(TestApi::default()); let block0 = api.expect_hash_and_number(0); - let view = Arc::new(View::new( - api.clone(), - block0, - Default::default(), - Default::default(), - false.into(), - )); + let view = Arc::new( + View::new(api.clone(), block0, Default::default(), Default::default(), false.into()).0, + ); let queue = Arc::new(RevalidationQueue::new()); let uxt = uxt(Transfer { diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs index 4fa83ccc79bfa4a0348043ff547e10718b7fe30b..348108e24dcfc21b61c6ec18b44dd13925d8a2cc 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs @@ -27,14 +27,16 @@ use super::metrics::MetricsLink as PrometheusMetrics; use crate::{ common::tracing_log_xt::log_xt_trace, graph::{ - self, base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash, IsValidator, - TransactionFor, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor, + self, base_pool::TimedTransactionSource, BlockHash, ExtrinsicFor, ExtrinsicHash, + IsValidator, TransactionFor, ValidatedPoolSubmitOutcome, ValidatedTransaction, + ValidatedTransactionFor, }, LOG_TARGET, }; use indexmap::IndexMap; use parking_lot::Mutex; -use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus}; +use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus, TransactionStatus}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::HashAndNumber; use sp_runtime::{ generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError, @@ -109,13 +111,139 @@ impl<ChainApi: graph::ChainApi> FinishRevalidationWorkerChannels<ChainApi> { } } +/// Single event used in aggregated stream. Tuple containing hash of transactions and its status. +pub(super) type TransactionStatusEvent<H, BH> = (H, TransactionStatus<H, BH>); +/// Warning threshold for (unbounded) channel used in aggregated view's streams. +const VIEW_STREAM_WARN_THRESHOLD: usize = 100_000; + +/// Stream of events providing statuses of all the transactions within the pool. +pub(super) type AggregatedStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>; + +/// Type alias for a stream of events intended to track dropped transactions. +type DroppedMonitoringStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>; + +/// Notification handler for transactions updates triggered in `ValidatedPool`. +/// +/// `ViewPoolObserver` handles transaction status changes notifications coming from an instance of +/// validated pool associated with the `View` and forwards them through specified channels +/// into the View's streams. +pub(super) struct ViewPoolObserver<ChainApi: graph::ChainApi> { + /// The sink used to notify dropped by enforcing limits or by being usurped, or invalid + /// transactions. + /// + /// Note: Ready and future statuses are alse communicated through this channel, enabling the + /// stream consumer to track views that reference the transaction. + dropped_stream_sink: TracingUnboundedSender< + TransactionStatusEvent<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + >, + + /// The sink of the single, merged stream providing updates for all the transactions in the + /// associated pool. + /// + /// Note: some of the events which are currently ignored on the other side of this channel + /// (external watcher) are not relayed. + aggregated_stream_sink: TracingUnboundedSender< + TransactionStatusEvent<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + >, +} + +impl<C: graph::ChainApi> graph::EventHandler<C> for ViewPoolObserver<C> { + // note: skipped, notified by ForkAwareTxPool directly to multi view listener. + fn broadcasted(&self, _: ExtrinsicHash<C>, _: Vec<String>) {} + fn dropped(&self, _: ExtrinsicHash<C>) {} + fn finalized(&self, _: ExtrinsicHash<C>, _: BlockHash<C>, _: usize) {} + fn retracted(&self, _: ExtrinsicHash<C>, _: BlockHash<C>) { + // note: [#5479], we do not send to aggregated stream. + } + + fn ready(&self, tx: ExtrinsicHash<C>) { + let status = TransactionStatus::Ready; + self.send_to_dropped_stream_sink(tx, status.clone()); + self.send_to_aggregated_stream_sink(tx, status); + } + + fn future(&self, tx: ExtrinsicHash<C>) { + let status = TransactionStatus::Future; + self.send_to_dropped_stream_sink(tx, status.clone()); + self.send_to_aggregated_stream_sink(tx, status); + } + + fn limits_enforced(&self, tx: ExtrinsicHash<C>) { + self.send_to_dropped_stream_sink(tx, TransactionStatus::Dropped); + } + + fn usurped(&self, tx: ExtrinsicHash<C>, by: ExtrinsicHash<C>) { + self.send_to_dropped_stream_sink(tx, TransactionStatus::Usurped(by)); + } + + fn invalid(&self, tx: ExtrinsicHash<C>) { + self.send_to_dropped_stream_sink(tx, TransactionStatus::Invalid); + } + + fn pruned(&self, tx: ExtrinsicHash<C>, block_hash: BlockHash<C>, tx_index: usize) { + self.send_to_aggregated_stream_sink(tx, TransactionStatus::InBlock((block_hash, tx_index))); + } + + fn finality_timeout(&self, tx: ExtrinsicHash<C>, hash: BlockHash<C>) { + //todo: do we need this? [related issue: #5482] + self.send_to_aggregated_stream_sink(tx, TransactionStatus::FinalityTimeout(hash)); + } +} + +impl<ChainApi: graph::ChainApi> ViewPoolObserver<ChainApi> { + /// Creates an instance of `ViewPoolObserver` together with associated view's streams. + /// + /// This methods creates an event handler that shall be registered in the `ValidatedPool` + /// instance associated with the view. It also creates new view's streams: + /// - a single stream intended to watch dropped transactions only. The stream can be used to + /// subscribe to events related to dropping of all extrinsics in the pool. + /// - a single merged stream for all extrinsics in the associated pool. The stream can be used + /// to subscribe to life-cycle events of all extrinsics in the pool. For fork-aware + /// pool implementation this approach seems to be more efficient than using individual + /// streams for every transaction. + fn new() -> ( + Self, + DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + ) { + let (dropped_stream_sink, dropped_stream) = + tracing_unbounded("mpsc_txpool_watcher", VIEW_STREAM_WARN_THRESHOLD); + let (aggregated_stream_sink, aggregated_stream) = + tracing_unbounded("mpsc_txpool_aggregated_stream", VIEW_STREAM_WARN_THRESHOLD); + + (Self { dropped_stream_sink, aggregated_stream_sink }, dropped_stream, aggregated_stream) + } + + /// Sends given event to the `dropped_stream_sink`. + fn send_to_dropped_stream_sink( + &self, + tx: ExtrinsicHash<ChainApi>, + status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + ) { + if let Err(e) = self.dropped_stream_sink.unbounded_send((tx, status.clone())) { + trace!(target: LOG_TARGET, "[{:?}] dropped_sink: {:?} send message failed: {:?}", tx, status, e); + } + } + + /// Sends given event to the `aggregated_stream_sink`. + fn send_to_aggregated_stream_sink( + &self, + tx: ExtrinsicHash<ChainApi>, + status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + ) { + if let Err(e) = self.aggregated_stream_sink.unbounded_send((tx, status.clone())) { + trace!(target: LOG_TARGET, "[{:?}] aggregated_stream {:?} send message failed: {:?}", tx, status, e); + } + } +} + /// Represents the state of transaction pool for given block. /// /// Refer to [*View*](../index.html#view) section for more details on the purpose and life cycle of /// the `View`. pub(super) struct View<ChainApi: graph::ChainApi> { /// The internal pool keeping the set of ready and future transaction at the given block. - pub(super) pool: graph::Pool<ChainApi>, + pub(super) pool: graph::Pool<ChainApi, ViewPoolObserver<ChainApi>>, /// The hash and number of the block with which this view is associated. pub(super) at: HashAndNumber<ChainApi::Block>, /// Endpoints of communication channel with background worker. @@ -136,24 +264,50 @@ where options: graph::Options, metrics: PrometheusMetrics, is_validator: IsValidator, - ) -> Self { + ) -> ( + Self, + DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + ) { metrics.report(|metrics| metrics.non_cloned_views.inc()); - Self { - pool: graph::Pool::new(options, is_validator, api), - at, - revalidation_worker_channels: Mutex::from(None), - metrics, - } + let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new(); + ( + Self { + pool: graph::Pool::new_with_event_handler( + options, + is_validator, + api, + event_handler, + ), + at, + revalidation_worker_channels: Mutex::from(None), + metrics, + }, + dropped_stream, + aggregated_stream, + ) } /// Creates a copy of the other view. - pub(super) fn new_from_other(&self, at: &HashAndNumber<ChainApi::Block>) -> Self { - View { - at: at.clone(), - pool: self.pool.deep_clone(), - revalidation_worker_channels: Mutex::from(None), - metrics: self.metrics.clone(), - } + pub(super) fn new_from_other( + &self, + at: &HashAndNumber<ChainApi::Block>, + ) -> ( + Self, + DroppedMonitoringStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + AggregatedStream<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, + ) { + let (event_handler, dropped_stream, aggregated_stream) = ViewPoolObserver::new(); + ( + View { + at: at.clone(), + pool: self.pool.deep_clone_with_event_handler(event_handler), + revalidation_worker_channels: Mutex::from(None), + metrics: self.metrics.clone(), + }, + dropped_stream, + aggregated_stream, + ) } /// Imports single unvalidated extrinsic into the view. @@ -504,7 +658,10 @@ where listener_action: F, ) -> Vec<TransactionFor<ChainApi>> where - F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>), + F: Fn( + &mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>, + ExtrinsicHash<ChainApi>, + ), { self.pool.validated_pool().remove_subtree(hashes, listener_action) } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index 96cf9f7106894d43e17e2f089f3a513e6f0c3e9d..a1585741839fae866f45b91d394aeea1e2a844e4 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -20,7 +20,7 @@ use super::{ multi_view_listener::{MultiViewListener, TxStatusStream}, - view::View, + view::{View, ViewPoolObserver}, }; use crate::{ fork_aware_txpool::dropped_watcher::MultiViewDroppedWatcherController, @@ -62,8 +62,13 @@ where /// Helper type representing the callback allowing to trigger per-transaction events on /// `ValidatedPool`'s listener. -type RemovalListener<ChainApi> = - Arc<dyn Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>) + Send + Sync>; +type RemovalCallback<ChainApi> = Arc< + dyn Fn( + &mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>, + ExtrinsicHash<ChainApi>, + ) + Send + + Sync, +>; /// Helper struct to maintain the context for pending transaction removal, executed for /// newly inserted views. @@ -74,7 +79,7 @@ where /// Hash of the transaction that will be removed, xt_hash: ExtrinsicHash<ChainApi>, /// Action that shall be executed on underlying `ValidatedPool`'s listener. - listener_action: RemovalListener<ChainApi>, + listener_action: RemovalCallback<ChainApi>, } /// This enum represents an action that should be executed on the newly built @@ -119,7 +124,7 @@ where /// Creates new unprocessed instance of pending transaction removal. fn new_removal_action( xt_hash: ExtrinsicHash<ChainApi>, - listener: RemovalListener<ChainApi>, + listener: RemovalCallback<ChainApi>, ) -> Self { Self { processed: false, @@ -876,8 +881,10 @@ where listener_action: F, ) -> Vec<TransactionFor<ChainApi>> where - F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>) - + Clone + F: Fn( + &mut crate::graph::EventDispatcher<ChainApi, ViewPoolObserver<ChainApi>>, + ExtrinsicHash<ChainApi>, + ) + Clone + Send + Sync + 'static, diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs index 340b6d429ae7e969f81b59ea2bf5ed58ac313758..a40a708edaf05539c871a866e508c62484a94ff8 100644 --- a/substrate/client/transaction-pool/src/graph/listener.rs +++ b/substrate/client/transaction-pool/src/graph/listener.rs @@ -20,59 +20,91 @@ use std::{collections::HashMap, fmt::Debug, hash}; use linked_hash_map::LinkedHashMap; use log::trace; -use sc_transaction_pool_api::TransactionStatus; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use serde::Serialize; -use sp_runtime::traits; use super::{watcher, BlockHash, ChainApi, ExtrinsicHash}; static LOG_TARGET: &str = "txpool::watcher"; -/// Single event used in aggregated stream. Tuple containing hash of transactions and its status. -pub type TransactionStatusEvent<H, BH> = (H, TransactionStatus<H, BH>); -/// Stream of events providing statuses of all the transactions within the pool. -pub type AggregatedStream<H, BH> = TracingUnboundedReceiver<TransactionStatusEvent<H, BH>>; +/// The `EventHandler` trait provides a mechanism for clients to respond to various +/// transaction-related events. It offers a set of callback methods that are invoked by the +/// transaction pool's event dispatcher to notify about changes in the status of transactions. +/// +/// This trait can be implemented by any component that needs to respond to transaction lifecycle +/// events, enabling custom logic and handling of these events. +pub trait EventHandler<C: ChainApi> { + /// Called when a transaction is broadcasted. + fn broadcasted(&self, _hash: ExtrinsicHash<C>, _peers: Vec<String>) {} -/// Warning threshold for (unbounded) channel used in aggregated stream. -const AGGREGATED_STREAM_WARN_THRESHOLD: usize = 100_000; + /// Called when a transaction is ready for execution. + fn ready(&self, _tx: ExtrinsicHash<C>) {} -/// Extrinsic pool default listener. -pub struct Listener<H: hash::Hash + Eq, C: ChainApi> { + /// Called when a transaction is deemed to be executable in the future. + fn future(&self, _tx: ExtrinsicHash<C>) {} + + /// Called when transaction pool limits result in a transaction being affected. + fn limits_enforced(&self, _tx: ExtrinsicHash<C>) {} + + /// Called when a transaction is replaced by another. + fn usurped(&self, _tx: ExtrinsicHash<C>, _by: ExtrinsicHash<C>) {} + + /// Called when a transaction is dropped from the pool. + fn dropped(&self, _tx: ExtrinsicHash<C>) {} + + /// Called when a transaction is found to be invalid. + fn invalid(&self, _tx: ExtrinsicHash<C>) {} + + /// Called when a transaction was pruned from the pool due to its presence in imported block. + fn pruned(&self, _tx: ExtrinsicHash<C>, _block_hash: BlockHash<C>, _tx_index: usize) {} + + /// Called when a transaction is retracted from inclusion in a block. + fn retracted(&self, _tx: ExtrinsicHash<C>, _block_hash: BlockHash<C>) {} + + /// Called when a transaction has not been finalized within a timeout period. + fn finality_timeout(&self, _tx: ExtrinsicHash<C>, _hash: BlockHash<C>) {} + + /// Called when a transaction is finalized in a block. + fn finalized(&self, _tx: ExtrinsicHash<C>, _block_hash: BlockHash<C>, _tx_index: usize) {} +} + +impl<C: ChainApi> EventHandler<C> for () {} + +/// The `EventDispatcher` struct is responsible for dispatching transaction-related events from the +/// validated pool to interested observers and an optional event handler. It acts as the primary +/// liaison between the transaction pool and clients that are monitoring transaction statuses. +pub struct EventDispatcher<H: hash::Hash + Eq, C: ChainApi, L: EventHandler<C>> { /// Map containing per-transaction sinks for emitting transaction status events. watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>, finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>, - /// The sink used to notify dropped by enforcing limits or by being usurped, or invalid - /// transactions. - /// - /// Note: Ready and future statuses are alse communicated through this channel, enabling the - /// stream consumer to track views that reference the transaction. - dropped_stream_sink: Option<TracingUnboundedSender<TransactionStatusEvent<H, BlockHash<C>>>>, - - /// The sink of the single, merged stream providing updates for all the transactions in the - /// associated pool. - aggregated_stream_sink: Option<TracingUnboundedSender<TransactionStatusEvent<H, BlockHash<C>>>>, + /// Optional event handler (listener) that will be notified about all transactions status + /// changes from the pool. + event_handler: Option<L>, } /// Maximum number of blocks awaiting finality at any time. const MAX_FINALITY_WATCHERS: usize = 512; -impl<H: hash::Hash + Eq + Debug, C: ChainApi> Default for Listener<H, C> { +impl<H: hash::Hash + Eq + Debug, C: ChainApi, L: EventHandler<C>> Default + for EventDispatcher<H, C, L> +{ fn default() -> Self { Self { watchers: Default::default(), finality_watchers: Default::default(), - dropped_stream_sink: None, - aggregated_stream_sink: None, + event_handler: None, } } } -impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H, C> { - fn fire<F>(&mut self, hash: &H, fun: F) +impl<C: ChainApi, L: EventHandler<C>> EventDispatcher<ExtrinsicHash<C>, C, L> { + /// Creates a new instance with provided event handler. + pub fn new_with_event_handler(event_handler: Option<L>) -> Self { + Self { event_handler, ..Default::default() } + } + + fn fire<F>(&mut self, hash: &ExtrinsicHash<C>, fun: F) where - F: FnOnce(&mut watcher::Sender<H, ExtrinsicHash<C>>), + F: FnOnce(&mut watcher::Sender<ExtrinsicHash<C>, ExtrinsicHash<C>>), { let clean = if let Some(h) = self.watchers.get_mut(hash) { fun(h); @@ -89,138 +121,88 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H /// Creates a new watcher for given verified extrinsic. /// /// The watcher can be used to subscribe to life-cycle events of that extrinsic. - pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, ExtrinsicHash<C>> { - let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default); + pub fn create_watcher( + &mut self, + hash: ExtrinsicHash<C>, + ) -> watcher::Watcher<ExtrinsicHash<C>, ExtrinsicHash<C>> { + let sender = self.watchers.entry(hash).or_insert_with(watcher::Sender::default); sender.new_watcher(hash) } - /// Creates a new single stream intended to watch dropped transactions only. - /// - /// The stream can be used to subscribe to events related to dropping of all extrinsics in the - /// pool. - pub fn create_dropped_by_limits_stream(&mut self) -> AggregatedStream<H, BlockHash<C>> { - let (sender, single_stream) = - tracing_unbounded("mpsc_txpool_watcher", AGGREGATED_STREAM_WARN_THRESHOLD); - self.dropped_stream_sink = Some(sender); - single_stream - } - - /// Creates a new single merged stream for all extrinsics in the associated pool. - /// - /// The stream can be used to subscribe to life-cycle events of all extrinsics in the pool. For - /// some implementations (e.g. fork-aware pool) this approach may be more efficient than using - /// individual streams for every transaction. - /// - /// Note: some of the events which are currently ignored on the other side of this channel - /// (external watcher) are not sent. - pub fn create_aggregated_stream(&mut self) -> AggregatedStream<H, BlockHash<C>> { - let (sender, aggregated_stream) = - tracing_unbounded("mpsc_txpool_aggregated_stream", AGGREGATED_STREAM_WARN_THRESHOLD); - self.aggregated_stream_sink = Some(sender); - aggregated_stream - } - /// Notify the listeners about the extrinsic broadcast. - pub fn broadcasted(&mut self, hash: &H, peers: Vec<String>) { + pub fn broadcasted(&mut self, hash: &ExtrinsicHash<C>, peers: Vec<String>) { trace!(target: LOG_TARGET, "[{:?}] Broadcasted", hash); - self.fire(hash, |watcher| watcher.broadcast(peers)); - } - - /// Sends given event to the `dropped_stream_sink`. - fn send_to_dropped_stream_sink(&mut self, tx: &H, status: TransactionStatus<H, BlockHash<C>>) { - if let Some(ref sink) = self.dropped_stream_sink { - if let Err(e) = sink.unbounded_send((tx.clone(), status.clone())) { - trace!(target: LOG_TARGET, "[{:?}] dropped_sink: {:?} send message failed: {:?}", tx, status, e); - } - } - } - - /// Sends given event to the `aggregated_stream_sink`. - fn send_to_aggregated_stream_sink( - &mut self, - tx: &H, - status: TransactionStatus<H, BlockHash<C>>, - ) { - if let Some(ref sink) = self.aggregated_stream_sink { - if let Err(e) = sink.unbounded_send((tx.clone(), status.clone())) { - trace!(target: LOG_TARGET, "[{:?}] aggregated_stream {:?} send message failed: {:?}", tx, status, e); - } - } + self.fire(hash, |watcher| watcher.broadcast(peers.clone())); + self.event_handler.as_ref().map(|l| l.broadcasted(*hash, peers)); } /// New transaction was added to the ready pool or promoted from the future pool. - pub fn ready(&mut self, tx: &H, old: Option<&H>) { - trace!(target: LOG_TARGET, "[{:?}] Ready (replaced with {:?})", tx, old); + pub fn ready(&mut self, tx: &ExtrinsicHash<C>, old: Option<&ExtrinsicHash<C>>) { + trace!(target: LOG_TARGET, "[{:?}] Ready (replaced with {:?})", *tx, old); self.fire(tx, |watcher| watcher.ready()); if let Some(old) = old { - self.fire(old, |watcher| watcher.usurped(tx.clone())); + self.fire(old, |watcher| watcher.usurped(*tx)); } - self.send_to_dropped_stream_sink(tx, TransactionStatus::Ready); - self.send_to_aggregated_stream_sink(tx, TransactionStatus::Ready); + self.event_handler.as_ref().map(|l| l.ready(*tx)); } /// New transaction was added to the future pool. - pub fn future(&mut self, tx: &H) { + pub fn future(&mut self, tx: &ExtrinsicHash<C>) { trace!(target: LOG_TARGET, "[{:?}] Future", tx); self.fire(tx, |watcher| watcher.future()); - self.send_to_dropped_stream_sink(tx, TransactionStatus::Future); - self.send_to_aggregated_stream_sink(tx, TransactionStatus::Future); + self.event_handler.as_ref().map(|l| l.future(*tx)); } /// Transaction was dropped from the pool because of enforcing the limit. - pub fn limits_enforced(&mut self, tx: &H) { + pub fn limits_enforced(&mut self, tx: &ExtrinsicHash<C>) { trace!(target: LOG_TARGET, "[{:?}] Dropped (limits enforced)", tx); self.fire(tx, |watcher| watcher.limit_enforced()); - self.send_to_dropped_stream_sink(tx, TransactionStatus::Dropped); + self.event_handler.as_ref().map(|l| l.limits_enforced(*tx)); } /// Transaction was replaced with other extrinsic. - pub fn usurped(&mut self, tx: &H, by: &H) { + pub fn usurped(&mut self, tx: &ExtrinsicHash<C>, by: &ExtrinsicHash<C>) { trace!(target: LOG_TARGET, "[{:?}] Dropped (replaced with {:?})", tx, by); - self.fire(tx, |watcher| watcher.usurped(by.clone())); + self.fire(tx, |watcher| watcher.usurped(*by)); - self.send_to_dropped_stream_sink(tx, TransactionStatus::Usurped(by.clone())); + self.event_handler.as_ref().map(|l| l.usurped(*tx, *by)); } /// Transaction was dropped from the pool because of the failure during the resubmission of /// revalidate transactions or failure during pruning tags. - pub fn dropped(&mut self, tx: &H) { + pub fn dropped(&mut self, tx: &ExtrinsicHash<C>) { trace!(target: LOG_TARGET, "[{:?}] Dropped", tx); self.fire(tx, |watcher| watcher.dropped()); + self.event_handler.as_ref().map(|l| l.dropped(*tx)); } /// Transaction was removed as invalid. - pub fn invalid(&mut self, tx: &H) { + pub fn invalid(&mut self, tx: &ExtrinsicHash<C>) { trace!(target: LOG_TARGET, "[{:?}] Extrinsic invalid", tx); self.fire(tx, |watcher| watcher.invalid()); - - self.send_to_dropped_stream_sink(tx, TransactionStatus::Invalid); + self.event_handler.as_ref().map(|l| l.invalid(*tx)); } /// Transaction was pruned from the pool. - pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &H) { + pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &ExtrinsicHash<C>) { trace!(target: LOG_TARGET, "[{:?}] Pruned at {:?}", tx, block_hash); // Get the transactions included in the given block hash. let txs = self.finality_watchers.entry(block_hash).or_insert(vec![]); - txs.push(tx.clone()); + txs.push(*tx); // Current transaction is the last one included. let tx_index = txs.len() - 1; self.fire(tx, |watcher| watcher.in_block(block_hash, tx_index)); - self.send_to_aggregated_stream_sink(tx, TransactionStatus::InBlock((block_hash, tx_index))); + self.event_handler.as_ref().map(|l| l.pruned(*tx, block_hash, tx_index)); while self.finality_watchers.len() > MAX_FINALITY_WATCHERS { if let Some((hash, txs)) = self.finality_watchers.pop_front() { for tx in txs { self.fire(&tx, |watcher| watcher.finality_timeout(hash)); - //todo: do we need this? [related issue: #5482] - self.send_to_aggregated_stream_sink( - &tx, - TransactionStatus::FinalityTimeout(hash), - ); + self.event_handler.as_ref().map(|l| l.finality_timeout(tx, block_hash)); } } } @@ -231,7 +213,7 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H if let Some(hashes) = self.finality_watchers.remove(&block_hash) { for hash in hashes { self.fire(&hash, |watcher| watcher.retracted(block_hash)); - // note: [#5479], we do not send to aggregated stream. + self.event_handler.as_ref().map(|l| l.retracted(hash, block_hash)); } } } @@ -246,13 +228,14 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H hash, block_hash, ); - self.fire(&hash, |watcher| watcher.finalized(block_hash, tx_index)) + self.fire(&hash, |watcher| watcher.finalized(block_hash, tx_index)); + self.event_handler.as_ref().map(|l| l.finalized(hash, block_hash, tx_index)); } } } /// Provides hashes of all watched transactions. - pub fn watched_transactions(&self) -> impl Iterator<Item = &H> { + pub fn watched_transactions(&self) -> impl Iterator<Item = &ExtrinsicHash<C>> { self.watchers.keys() } } diff --git a/substrate/client/transaction-pool/src/graph/mod.rs b/substrate/client/transaction-pool/src/graph/mod.rs index c3161799785a97f668c93c8773257e57d3dfe91d..3e6a63babc99c86b0effd0c59bdb51d3e6da80e4 100644 --- a/substrate/client/transaction-pool/src/graph/mod.rs +++ b/substrate/client/transaction-pool/src/graph/mod.rs @@ -42,13 +42,12 @@ pub use self::pool::{ TransactionFor, ValidatedTransactionFor, }; pub use validated_pool::{ - BaseSubmitOutcome, IsValidator, Listener, ValidatedPoolSubmitOutcome, ValidatedTransaction, + BaseSubmitOutcome, EventDispatcher, IsValidator, ValidatedPoolSubmitOutcome, + ValidatedTransaction, }; pub(crate) use self::pool::CheckBannedBeforeVerify; -pub(crate) use listener::TransactionStatusEvent; +pub(crate) use listener::EventHandler; -#[cfg(doc)] -pub(crate) use listener::AggregatedStream; #[cfg(doc)] pub(crate) use validated_pool::ValidatedPool; diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs index d938e9bf06e7d84b75bb9173c9304a2e47ded5ce..afd46617b366cce0d38833eb0b18bcbc8cd56b60 100644 --- a/substrate/client/transaction-pool/src/graph/pool.rs +++ b/substrate/client/transaction-pool/src/graph/pool.rs @@ -36,7 +36,7 @@ use std::{ use super::{ base_pool as base, validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction}, - ValidatedPoolSubmitOutcome, + EventHandler, ValidatedPoolSubmitOutcome, }; /// Modification notification event stream type; @@ -173,11 +173,11 @@ pub(crate) enum CheckBannedBeforeVerify { } /// Extrinsics pool that performs validation. -pub struct Pool<B: ChainApi> { - validated_pool: Arc<ValidatedPool<B>>, +pub struct Pool<B: ChainApi, L: EventHandler<B>> { + validated_pool: Arc<ValidatedPool<B, L>>, } -impl<B: ChainApi> Pool<B> { +impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> { /// Create a new transaction pool with statically sized rotator. pub fn new_with_staticly_sized_rotator( options: Options, @@ -198,6 +198,23 @@ impl<B: ChainApi> Pool<B> { Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) } } + /// Create a new transaction pool. + pub fn new_with_event_handler( + options: Options, + is_validator: IsValidator, + api: Arc<B>, + event_handler: L, + ) -> Self { + Self { + validated_pool: Arc::new(ValidatedPool::new_with_event_handler( + options, + is_validator, + api, + event_handler, + )), + } + } + /// Imports a bunch of unverified extrinsics to the pool pub async fn submit_at( &self, @@ -481,7 +498,7 @@ impl<B: ChainApi> Pool<B> { } /// Get a reference to the underlying validated pool. - pub fn validated_pool(&self) -> &ValidatedPool<B> { + pub fn validated_pool(&self) -> &ValidatedPool<B, L> { &self.validated_pool } @@ -491,12 +508,13 @@ impl<B: ChainApi> Pool<B> { } } -impl<B: ChainApi> Pool<B> { +impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> { /// Deep clones the pool. /// /// Must be called on purpose: it duplicates all the internal structures. - pub fn deep_clone(&self) -> Self { - let other: ValidatedPool<B> = (*self.validated_pool).clone(); + pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self { + let other: ValidatedPool<B, L> = + self.validated_pool().deep_clone_with_event_handler(event_handler); Self { validated_pool: Arc::from(other) } } } @@ -519,6 +537,8 @@ mod tests { const SOURCE: TimedTransactionSource = TimedTransactionSource { source: TransactionSource::External, timestamp: None }; + type Pool<Api> = super::Pool<Api, ()>; + #[test] fn should_validate_and_import_transaction() { // given diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index 174b69da7611b46a4981ea568dcbefecfed34b87..8eb967421bd70e00529758a6de936e360fb7bafc 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -35,6 +35,7 @@ use std::time::Instant; use super::{ base_pool::{self as base, PruneStatus}, + listener::EventHandler, pool::{ BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor, }, @@ -91,8 +92,8 @@ impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> { pub type ValidatedTransactionFor<B> = ValidatedTransaction<ExtrinsicHash<B>, ExtrinsicFor<B>, <B as ChainApi>::Error>; -/// A type alias representing ValidatedPool listener for given ChainApi type. -pub type Listener<B> = super::listener::Listener<ExtrinsicHash<B>, B>; +/// A type alias representing ValidatedPool event dispatcher for given ChainApi type. +pub type EventDispatcher<B, L> = super::listener::EventDispatcher<ExtrinsicHash<B>, B, L>; /// A closure that returns true if the local node is a validator that can author blocks. #[derive(Clone)] @@ -155,23 +156,23 @@ impl<B: ChainApi, W> BaseSubmitOutcome<B, W> { } /// Pool that deals with validated transactions. -pub struct ValidatedPool<B: ChainApi> { +pub struct ValidatedPool<B: ChainApi, L: EventHandler<B>> { api: Arc<B>, is_validator: IsValidator, options: Options, - listener: RwLock<Listener<B>>, + event_dispatcher: RwLock<EventDispatcher<B, L>>, pub(crate) pool: RwLock<base::BasePool<ExtrinsicHash<B>, ExtrinsicFor<B>>>, import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>, rotator: PoolRotator<ExtrinsicHash<B>>, } -impl<B: ChainApi> Clone for ValidatedPool<B> { +impl<B: ChainApi, L: EventHandler<B>> Clone for ValidatedPool<B, L> { fn clone(&self) -> Self { Self { api: self.api.clone(), is_validator: self.is_validator.clone(), options: self.options.clone(), - listener: Default::default(), + event_dispatcher: Default::default(), pool: RwLock::from(self.pool.read().clone()), import_notification_sinks: Default::default(), rotator: self.rotator.clone(), @@ -179,7 +180,16 @@ impl<B: ChainApi> Clone for ValidatedPool<B> { } } -impl<B: ChainApi> ValidatedPool<B> { +impl<B: ChainApi, L: EventHandler<B>> ValidatedPool<B, L> { + pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self { + Self { + event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(Some( + event_handler, + ))), + ..self.clone() + } + } + /// Create a new transaction pool with statically sized rotator. pub fn new_with_staticly_sized_rotator( options: Options, @@ -187,7 +197,7 @@ impl<B: ChainApi> ValidatedPool<B> { api: Arc<B>, ) -> Self { let ban_time = options.ban_time; - Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time)) + Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time), None) } /// Create a new transaction pool. @@ -199,6 +209,25 @@ impl<B: ChainApi> ValidatedPool<B> { is_validator, api, PoolRotator::new_with_expected_size(ban_time, total_count), + None, + ) + } + + /// Create a new transaction pool with given event handler. + pub fn new_with_event_handler( + options: Options, + is_validator: IsValidator, + api: Arc<B>, + event_handler: L, + ) -> Self { + let ban_time = options.ban_time; + let total_count = options.total_count(); + Self::new_with_rotator( + options, + is_validator, + api, + PoolRotator::new_with_expected_size(ban_time, total_count), + Some(event_handler), ) } @@ -207,12 +236,13 @@ impl<B: ChainApi> ValidatedPool<B> { is_validator: IsValidator, api: Arc<B>, rotator: PoolRotator<ExtrinsicHash<B>>, + event_handler: Option<L>, ) -> Self { let base_pool = base::BasePool::new(options.reject_future_transactions); Self { is_validator, options, - listener: Default::default(), + event_dispatcher: RwLock::new(EventDispatcher::new_with_event_handler(event_handler)), api, pool: RwLock::new(base_pool), import_notification_sinks: Default::default(), @@ -309,8 +339,8 @@ impl<B: ChainApi> ValidatedPool<B> { }); } - let mut listener = self.listener.write(); - fire_events(&mut *listener, &imported); + let mut event_dispatcher = self.event_dispatcher.write(); + fire_events(&mut *event_dispatcher, &imported); Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority))) }, ValidatedTransaction::Invalid(hash, err) => { @@ -320,7 +350,7 @@ impl<B: ChainApi> ValidatedPool<B> { }, ValidatedTransaction::Unknown(hash, err) => { log::trace!(target: LOG_TARGET, "[{:?}] ValidatedPool::submit_one unknown {:?}", hash, err); - self.listener.write().invalid(&hash); + self.event_dispatcher.write().invalid(&hash); Err(err) }, } @@ -360,9 +390,9 @@ impl<B: ChainApi> ValidatedPool<B> { } // run notifications - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); for h in &removed { - listener.limits_enforced(h); + event_dispatcher.limits_enforced(h); } removed @@ -398,12 +428,12 @@ impl<B: ChainApi> ValidatedPool<B> { &self, tx_hash: ExtrinsicHash<B>, ) -> Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>> { - self.listener.write().create_watcher(tx_hash) + self.event_dispatcher.write().create_watcher(tx_hash) } /// Provides a list of hashes for all watched transactions in the pool. pub fn watched_transactions(&self) -> Vec<ExtrinsicHash<B>> { - self.listener.read().watched_transactions().map(Clone::clone).collect() + self.event_dispatcher.read().watched_transactions().map(Clone::clone).collect() } /// Resubmits revalidated transactions back to the pool. @@ -528,15 +558,15 @@ impl<B: ChainApi> ValidatedPool<B> { }; // and now let's notify listeners about status changes - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); for (hash, final_status) in final_statuses { let initial_status = initial_statuses.remove(&hash); if initial_status.is_none() || Some(final_status) != initial_status { match final_status { - Status::Future => listener.future(&hash), - Status::Ready => listener.ready(&hash, None), - Status::Dropped => listener.dropped(&hash), - Status::Failed => listener.invalid(&hash), + Status::Future => event_dispatcher.future(&hash), + Status::Ready => event_dispatcher.ready(&hash, None), + Status::Dropped => event_dispatcher.dropped(&hash), + Status::Failed => event_dispatcher.invalid(&hash), } } } @@ -569,12 +599,12 @@ impl<B: ChainApi> ValidatedPool<B> { // Notify event listeners of all transactions // that were promoted to `Ready` or were dropped. { - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); for promoted in &status.promoted { - fire_events(&mut *listener, promoted); + fire_events(&mut *event_dispatcher, promoted); } for f in &status.failed { - listener.dropped(f); + event_dispatcher.dropped(f); } } @@ -618,13 +648,13 @@ impl<B: ChainApi> ValidatedPool<B> { at: &HashAndNumber<B::Block>, hashes: impl Iterator<Item = ExtrinsicHash<B>>, ) { - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); let mut set = HashSet::with_capacity(hashes.size_hint().0); for h in hashes { // `hashes` has possibly duplicate hashes. // we'd like to send out the `InBlock` notification only once. if !set.contains(&h) { - listener.pruned(at.hash, &h); + event_dispatcher.pruned(at.hash, &h); set.insert(h); } } @@ -681,9 +711,9 @@ impl<B: ChainApi> ValidatedPool<B> { /// Invoked when extrinsics are broadcasted. pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) { - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); for (hash, peers) in propagated.into_iter() { - listener.broadcasted(&hash, peers); + event_dispatcher.broadcasted(&hash, peers); } } @@ -736,27 +766,13 @@ impl<B: ChainApi> ValidatedPool<B> { "Attempting to notify watchers of finalization for {}", block_hash, ); - self.listener.write().finalized(block_hash); + self.event_dispatcher.write().finalized(block_hash); Ok(()) } - /// Notify the listener of retracted blocks + /// Notify the event_dispatcher of retracted blocks pub fn on_block_retracted(&self, block_hash: BlockHash<B>) { - self.listener.write().retracted(block_hash) - } - - /// Refer to [`Listener::create_dropped_by_limits_stream`] for details. - pub fn create_dropped_by_limits_stream( - &self, - ) -> super::listener::AggregatedStream<ExtrinsicHash<B>, BlockHash<B>> { - self.listener.write().create_dropped_by_limits_stream() - } - - /// Refer to [`Listener::create_aggregated_stream`] - pub fn create_aggregated_stream( - &self, - ) -> super::listener::AggregatedStream<ExtrinsicHash<B>, BlockHash<B>> { - self.listener.write().create_aggregated_stream() + self.event_dispatcher.write().retracted(block_hash) } /// Resends ready and future events for all the ready and future transactions that are already @@ -765,12 +781,12 @@ impl<B: ChainApi> ValidatedPool<B> { /// Intended to be called after cloning the instance of `ValidatedPool`. pub fn retrigger_notifications(&self) { let pool = self.pool.read(); - let mut listener = self.listener.write(); + let mut event_dispatcher = self.event_dispatcher.write(); pool.ready().for_each(|r| { - listener.ready(&r.hash, None); + event_dispatcher.ready(&r.hash, None); }); pool.futures().for_each(|f| { - listener.future(&f.hash); + event_dispatcher.future(&f.hash); }); } @@ -782,21 +798,21 @@ impl<B: ChainApi> ValidatedPool<B> { /// The root transaction will be banned from re-entrering the pool. Descendant transactions may /// be re-submitted to the pool if required. /// - /// A `listener_action` callback function is invoked for every transaction that is removed, - /// providing a reference to the pool's listener and the hash of the removed transaction. This - /// allows to trigger the required events. + /// A `event_disaptcher_action` callback function is invoked for every transaction that is + /// removed, providing a reference to the pool's event dispatcher and the hash of the removed + /// transaction. This allows to trigger the required events. /// /// Returns a vector containing the hashes of all removed transactions, including the root /// transaction specified by `tx_hash`. pub fn remove_subtree<F>( &self, hashes: &[ExtrinsicHash<B>], - listener_action: F, + event_dispatcher_action: F, ) -> Vec<TransactionFor<B>> where - F: Fn(&mut Listener<B>, ExtrinsicHash<B>), + F: Fn(&mut EventDispatcher<B, L>, ExtrinsicHash<B>), { - // temporarily ban invalid transactions + // temporarily ban removed transactions self.rotator.ban(&Instant::now(), hashes.iter().cloned()); let removed = self.pool.write().remove_subtree(hashes); @@ -804,25 +820,28 @@ impl<B: ChainApi> ValidatedPool<B> { .into_iter() .map(|tx| { let removed_tx_hash = tx.hash; - let mut listener = self.listener.write(); - listener_action(&mut *listener, removed_tx_hash); + let mut event_dispatcher = self.event_dispatcher.write(); + event_dispatcher_action(&mut *event_dispatcher, removed_tx_hash); tx.clone() }) .collect::<Vec<_>>() } } -fn fire_events<B, Ex>(listener: &mut Listener<B>, imported: &base::Imported<ExtrinsicHash<B>, Ex>) -where +fn fire_events<B, L, Ex>( + event_dispatcher: &mut EventDispatcher<B, L>, + imported: &base::Imported<ExtrinsicHash<B>, Ex>, +) where B: ChainApi, + L: EventHandler<B>, { match *imported { base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => { - listener.ready(hash, None); - failed.iter().for_each(|f| listener.invalid(f)); - removed.iter().for_each(|r| listener.usurped(&r.hash, hash)); - promoted.iter().for_each(|p| listener.ready(p, None)); + event_dispatcher.ready(hash, None); + failed.iter().for_each(|f| event_dispatcher.invalid(f)); + removed.iter().for_each(|r| event_dispatcher.usurped(&r.hash, hash)); + promoted.iter().for_each(|p| event_dispatcher.ready(p, None)); }, - base::Imported::Future { ref hash } => listener.future(hash), + base::Imported::Future { ref hash } => event_dispatcher.future(hash), } } diff --git a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs index ffcade085916041d4a936a53b0f6ee0298faed75..413d97b11b0e1b65f5176621d8b6bc6069a59f74 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs @@ -18,7 +18,8 @@ //! Pool periodic revalidation. -use crate::graph::{BlockHash, ChainApi, ExtrinsicHash, Pool, ValidatedTransaction}; +use crate::graph::{BlockHash, ChainApi, ExtrinsicHash, ValidatedTransaction}; +use futures::prelude::*; use indexmap::IndexMap; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::{ @@ -28,17 +29,17 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, pin::Pin, sync::Arc, + time::Duration, }; -use futures::prelude::*; -use std::time::Duration; - const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(200); const MIN_BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20; const LOG_TARGET: &str = "txpool::revalidation"; +type Pool<Api> = crate::graph::Pool<Api, ()>; + /// Payload from queue to worker. struct WorkerPayload<Api: ChainApi> { at: BlockHash<Api>, diff --git a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs index 9f4d63f3ba3a9ab4354b14f25f1eb9d7d901127a..745b57d0c85bf440142b6be9165cd532c79562c4 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs @@ -29,7 +29,7 @@ use crate::{ error, log_xt::log_xt_trace, }, - graph::{self, base_pool::TimedTransactionSource, ExtrinsicHash, IsValidator}, + graph::{self, base_pool::TimedTransactionSource, EventHandler, ExtrinsicHash, IsValidator}, ReadyIteratorFor, LOG_TARGET, }; use async_trait::async_trait; @@ -64,7 +64,7 @@ where Block: BlockT, PoolApi: graph::ChainApi<Block = Block>, { - pool: Arc<graph::Pool<PoolApi>>, + pool: Arc<graph::Pool<PoolApi, ()>>, api: Arc<PoolApi>, revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>, revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>, @@ -225,7 +225,7 @@ where } /// Gets shared reference to the underlying pool. - pub fn pool(&self) -> &Arc<graph::Pool<PoolApi>> { + pub fn pool(&self) -> &Arc<graph::Pool<PoolApi, ()>> { &self.pool } @@ -583,10 +583,14 @@ impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> { } /// Prune the known txs for the given block. -pub async fn prune_known_txs_for_block<Block: BlockT, Api: graph::ChainApi<Block = Block>>( +pub async fn prune_known_txs_for_block< + Block: BlockT, + Api: graph::ChainApi<Block = Block>, + L: EventHandler<Api>, +>( at: &HashAndNumber<Block>, api: &Api, - pool: &graph::Pool<Api>, + pool: &graph::Pool<Api, L>, ) -> Vec<ExtrinsicHash<Api>> { let extrinsics = api .block_body(at.hash) diff --git a/substrate/client/transaction-pool/tests/pool.rs b/substrate/client/transaction-pool/tests/pool.rs index c70f4548331454f166e8112df00e42e1652e1fb6..1403ea06df791691591348a0e16184ac49fddef1 100644 --- a/substrate/client/transaction-pool/tests/pool.rs +++ b/substrate/client/transaction-pool/tests/pool.rs @@ -45,6 +45,8 @@ use substrate_test_runtime_client::{ }; use substrate_test_runtime_transaction_pool::{uxt, TestApi}; +type Pool<Api> = sc_transaction_pool::Pool<Api, ()>; + const LOG_TARGET: &str = "txpool"; fn pool() -> (Pool<TestApi>, Arc<TestApi>) {