From 2a6757d5bfb59203f0f743ece164eaadb46188e7 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Mon, 14 Oct 2024 20:42:28 +0200 Subject: [PATCH] import_sink_notification: txs are removed when dropped --- .../fork_aware_txpool/fork_aware_txpool.rs | 21 +++++++++++++++---- .../import_notification_sink.rs | 13 +++++++----- .../src/fork_aware_txpool/tx_mem_pool.rs | 2 +- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 684efdab1fb..404225167e5 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -197,7 +197,11 @@ where let (dropped_stream_controller, dropped_stream) = MultiViewDroppedWatcherController::<ChainApi>::new(); - let dropped_monitor_task = Self::dropped_monitor_task(dropped_stream, mempool.clone()); + let dropped_monitor_task = Self::dropped_monitor_task( + dropped_stream, + mempool.clone(), + import_notification_sink.clone(), + ); let combined_tasks = async move { tokio::select! { @@ -233,10 +237,14 @@ where /// /// This asynchronous task continuously listens for dropped transaction notifications provided /// within `dropped_stream` and ensures that these transactions are removed from the `mempool` - /// instance. + /// and `import_notification_sink` instances. async fn dropped_monitor_task( mut dropped_stream: StreamOfDropped<ChainApi>, mempool: Arc<TxMemPool<ChainApi, Block>>, + import_notification_sink: MultiViewImportNotificationSink< + Block::Hash, + ExtrinsicHash<ChainApi>, + >, ) { loop { let Some(dropped) = dropped_stream.next().await else { @@ -244,7 +252,8 @@ where break; }; log::trace!(target: LOG_TARGET, "[{:?}] fatp::dropped notification, removing", dropped); - mempool.remove_dropped_transactions(&vec![dropped]).await; + mempool.remove_dropped_transactions(&[dropped]).await; + import_notification_sink.clean_notified_items(&[dropped]); } } @@ -278,7 +287,11 @@ where let (dropped_stream_controller, dropped_stream) = MultiViewDroppedWatcherController::<ChainApi>::new(); - let dropped_monitor_task = Self::dropped_monitor_task(dropped_stream, mempool.clone()); + let dropped_monitor_task = Self::dropped_monitor_task( + dropped_stream, + mempool.clone(), + import_notification_sink.clone(), + ); let combined_tasks = async move { tokio::select! { diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs index 34fedeba64f..7fbdcade63b 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs @@ -178,12 +178,15 @@ where let already_notified_items = already_notified_items.clone(); async move { if already_notified_items.write().insert(event.clone()) { - for sink in &mut *external_sinks.write() { + external_sinks.write().retain_mut(|sink| { trace!(target: LOG_TARGET, "[{:?}] import_sink_worker sending out imported", event); - let _ = sink.try_send(event.clone()).map_err(|e| { + if let Err(e) = sink.try_send(event.clone()) { trace!(target: LOG_TARGET, "import_sink_worker sending message failed: {e}"); - }); - } + false + } else { + true + } + }); } } }) @@ -215,7 +218,7 @@ where /// Removes specified items from the `already_notified_items` set. /// /// Intended to be called once transactions are finalized. - pub fn clean_notified_items(&self, items_to_be_removed: &Vec<I>) { + pub fn clean_notified_items(&self, items_to_be_removed: &[I]) { let mut already_notified_items = self.already_notified_items.write(); items_to_be_removed.iter().for_each(|i| { already_notified_items.remove(i); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index 88a45033a54..86ea27dcf45 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -276,7 +276,7 @@ where /// and send the `Dropped` event to the listeners of these transactions. pub(super) async fn remove_dropped_transactions( &self, - to_be_removed: &Vec<ExtrinsicHash<ChainApi>>, + to_be_removed: &[ExtrinsicHash<ChainApi>], ) { log::debug!(target: LOG_TARGET, "remove_dropped_transactions count:{:?}", to_be_removed.len()); log_xt_trace!(target: LOG_TARGET, to_be_removed, "[{:?}] mempool::remove_dropped_transactions"); -- GitLab