From bfec26253219044adaf6cdb3fff542c12460ed5a Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Wed, 5 Feb 2025 15:59:33 +0100 Subject: [PATCH] improved mempool revalidation Mempool revalidation now removes the invalid transactions from the view_store. As a result handling `transactions_invalidated` in multi-view-listener does not require handling the case when transaction is still dangling in some view. (It is guaranteed that invalid transaction was removed from the view_store and can be safely notified as Invalid to external listner). --- .../fork_aware_txpool/fork_aware_txpool.rs | 1 + .../fork_aware_txpool/multi_view_listener.rs | 36 ++-------- .../fork_aware_txpool/revalidation_worker.rs | 22 ++++--- .../src/fork_aware_txpool/tx_mem_pool.rs | 66 +++++++++++++------ 4 files changed, 66 insertions(+), 59 deletions(-) diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 85c517e7e0d..218379f737d 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -1381,6 +1381,7 @@ where self.revalidation_queue .revalidate_mempool( self.mempool.clone(), + self.view_store.clone(), HashAndNumber { hash: finalized_hash, number: finalized_number }, ) .await; diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index 1f7635e1a13..bf91a109bfd 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -289,10 +289,10 @@ where request: TransactionStatusUpdate<ChainApi>, ) -> Option<TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>> { match request { - TransactionStatusUpdate::TransactionInvalidated(..) => - if self.handle_invalidate_transaction() { - return Some(TransactionStatus::Invalid) - }, + TransactionStatusUpdate::TransactionInvalidated(..) => { + self.terminate = true; + return Some(TransactionStatus::Invalid) + }, TransactionStatusUpdate::TransactionFinalized(_, block, index) => { self.terminate = true; return Some(TransactionStatus::Finalized((block, index))) @@ -375,34 +375,6 @@ where } } - /// Handles transaction invalidation sent via side channel. - /// - /// Function may set the context termination flag, which will close the stream. - /// - /// Returns true if the event should be sent out, and false if the invalidation request should - /// be skipped. - fn handle_invalidate_transaction(&mut self) -> bool { - let keys = self.known_views.clone(); - trace!( - target: LOG_TARGET, - tx_hash = ?self.tx_hash, - views = ?self.known_views.iter().collect::<Vec<_>>(), - "got invalidate_transaction" - ); - if self.views_keeping_tx_valid.is_disjoint(&keys) { - self.terminate = true; - true - } else { - //todo [#5477] - // - handle corner case: this may happen when tx is invalid for mempool, but somehow - // some view still sees it as ready/future. In that case we don't send the invalid - // event, as transaction can still be included. Probably we should set some flag here - // and allow for invalid sent from the view. - // - add debug / metrics, - false - } - } - /// Adds a new aggragted transaction status stream. /// /// Inserts a new view's transaction status stream into the stream map. The view is represented diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs index 0025d3e9f2d..2f3d31d0e6f 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs @@ -28,7 +28,7 @@ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound use sp_blockchain::HashAndNumber; use sp_runtime::traits::Block as BlockT; -use super::tx_mem_pool::TxMemPool; +use super::{tx_mem_pool::TxMemPool, view_store::ViewStore}; use futures::prelude::*; use tracing::{trace, warn}; @@ -45,7 +45,7 @@ where /// Communication channels with maintain thread are also provided. RevalidateView(Arc<View<Api>>, FinishRevalidationWorkerChannels<Api>), /// Request to revalidated the given instance of the [`TxMemPool`] at provided block hash. - RevalidateMempool(Arc<TxMemPool<Api, Block>>, HashAndNumber<Block>), + RevalidateMempool(Arc<TxMemPool<Api, Block>>, Arc<ViewStore<Api, Block>>, HashAndNumber<Block>), } /// The background revalidation worker. @@ -81,8 +81,11 @@ where match payload { WorkerPayload::RevalidateView(view, worker_channels) => view.revalidate(worker_channels).await, - WorkerPayload::RevalidateMempool(mempool, finalized_hash_and_number) => - mempool.revalidate(finalized_hash_and_number).await, + WorkerPayload::RevalidateMempool( + mempool, + view_store, + finalized_hash_and_number, + ) => mempool.revalidate(view_store, finalized_hash_and_number).await, }; } } @@ -164,6 +167,7 @@ where pub async fn revalidate_mempool( &self, mempool: Arc<TxMemPool<Api, Block>>, + view_store: Arc<ViewStore<Api, Block>>, finalized_hash: HashAndNumber<Block>, ) { trace!( @@ -173,9 +177,11 @@ where ); if let Some(ref to_worker) = self.background { - if let Err(error) = - to_worker.unbounded_send(WorkerPayload::RevalidateMempool(mempool, finalized_hash)) - { + if let Err(error) = to_worker.unbounded_send(WorkerPayload::RevalidateMempool( + mempool, + view_store, + finalized_hash, + )) { warn!( target: LOG_TARGET, ?error, @@ -183,7 +189,7 @@ where ); } } else { - mempool.revalidate(finalized_hash).await + mempool.revalidate(view_store, finalized_hash).await } } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index e141016ccb2..a99dffae1c0 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -28,7 +28,7 @@ use std::{ cmp::Ordering, - collections::HashMap, + collections::{HashMap, HashSet}, sync::{ atomic::{self, AtomicU64}, Arc, @@ -56,8 +56,9 @@ use crate::{ }; use super::{ - metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener, - view_store::ViewStoreSubmitOutcome, + metrics::MetricsLink as PrometheusMetrics, + multi_view_listener::MultiViewListener, + view_store::{ViewStore, ViewStoreSubmitOutcome}, }; /// The minimum interval between single transaction revalidations. Given in blocks. @@ -483,7 +484,7 @@ where trace!( target: LOG_TARGET, ?finalized_block, - "mempool::revalidate" + "mempool::revalidate_inner" ); let start = Instant::now(); @@ -531,7 +532,7 @@ where target: LOG_TARGET, ?tx_hash, ?validation_result, - "Purging: invalid" + "mempool::revalidate_inner invalid" ); Some(tx_hash) }, @@ -545,7 +546,7 @@ where count, invalid_hashes = invalid_hashes.len(), ?duration, - "mempool::revalidate" + "mempool::revalidate_inner" ); invalid_hashes @@ -570,23 +571,50 @@ where /// Revalidates transactions in the memory pool against a given finalized block and removes /// invalid ones. - pub(super) async fn revalidate(&self, finalized_block: HashAndNumber<Block>) { - trace!( - target: LOG_TARGET, - ?finalized_block, - "purge_transactions" - ); - let invalid_hashes = self.revalidate_inner(finalized_block.clone()).await; + pub(super) async fn revalidate( + &self, + view_store: Arc<ViewStore<ChainApi, Block>>, + finalized_block: HashAndNumber<Block>, + ) { + let revalidated_invalid_hashes = self.revalidate_inner(finalized_block.clone()).await; + + let mut invalid_hashes_subtrees = + revalidated_invalid_hashes.clone().into_iter().collect::<HashSet<_>>(); + for tx in &revalidated_invalid_hashes { + invalid_hashes_subtrees.extend( + view_store + .remove_transaction_subtree(*tx, |_, _| {}) + .into_iter() + .map(|tx| tx.hash), + ); + } + + { + let mut transactions = self.transactions.write(); + invalid_hashes_subtrees.iter().for_each(|tx_hash| { + transactions.remove(&tx_hash); + }); + }; self.metrics.report(|metrics| { - metrics.mempool_revalidation_invalid_txs.inc_by(invalid_hashes.len() as _) + metrics + .mempool_revalidation_invalid_txs + .inc_by(invalid_hashes_subtrees.len() as _) }); - let mut transactions = self.transactions.write(); - invalid_hashes.iter().for_each(|i| { - transactions.remove(i); - }); - self.listener.transactions_invalidated(&invalid_hashes); + let revalidated_invalid_hashes_len = revalidated_invalid_hashes.len(); + let invalid_hashes_subtrees_len = invalid_hashes_subtrees.len(); + + self.listener + .transactions_invalidated(&invalid_hashes_subtrees.into_iter().collect::<Vec<_>>()); + + trace!( + target: LOG_TARGET, + ?finalized_block, + revalidated_invalid_hashes_len, + invalid_hashes_subtrees_len, + "mempool::revalidate" + ); } /// Updates the priority of transaction stored in mempool using provided view_store submission -- GitLab