diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 684efdab1fbcb090f4034bca6cd17d8f43874204..404225167e576b9b8fd671f575270417c38f1249 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -197,7 +197,11 @@ where let (dropped_stream_controller, dropped_stream) = MultiViewDroppedWatcherController::<ChainApi>::new(); - let dropped_monitor_task = Self::dropped_monitor_task(dropped_stream, mempool.clone()); + let dropped_monitor_task = Self::dropped_monitor_task( + dropped_stream, + mempool.clone(), + import_notification_sink.clone(), + ); let combined_tasks = async move { tokio::select! { @@ -233,10 +237,14 @@ where /// /// This asynchronous task continuously listens for dropped transaction notifications provided /// within `dropped_stream` and ensures that these transactions are removed from the `mempool` - /// instance. + /// and `import_notification_sink` instances. async fn dropped_monitor_task( mut dropped_stream: StreamOfDropped<ChainApi>, mempool: Arc<TxMemPool<ChainApi, Block>>, + import_notification_sink: MultiViewImportNotificationSink< + Block::Hash, + ExtrinsicHash<ChainApi>, + >, ) { loop { let Some(dropped) = dropped_stream.next().await else { @@ -244,7 +252,8 @@ where break; }; log::trace!(target: LOG_TARGET, "[{:?}] fatp::dropped notification, removing", dropped); - mempool.remove_dropped_transactions(&vec![dropped]).await; + mempool.remove_dropped_transactions(&[dropped]).await; + import_notification_sink.clean_notified_items(&[dropped]); } } @@ -278,7 +287,11 @@ where let (dropped_stream_controller, dropped_stream) = MultiViewDroppedWatcherController::<ChainApi>::new(); - let dropped_monitor_task = Self::dropped_monitor_task(dropped_stream, mempool.clone()); + let dropped_monitor_task = Self::dropped_monitor_task( + dropped_stream, + mempool.clone(), + import_notification_sink.clone(), + ); let combined_tasks = async move { tokio::select! { diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs index 34fedeba64fd9d2442bb97f8b81253d6572a3766..7fbdcade63b86a9f6a78e8264b05e728f7c852e4 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs @@ -178,12 +178,15 @@ where let already_notified_items = already_notified_items.clone(); async move { if already_notified_items.write().insert(event.clone()) { - for sink in &mut *external_sinks.write() { + external_sinks.write().retain_mut(|sink| { trace!(target: LOG_TARGET, "[{:?}] import_sink_worker sending out imported", event); - let _ = sink.try_send(event.clone()).map_err(|e| { + if let Err(e) = sink.try_send(event.clone()) { trace!(target: LOG_TARGET, "import_sink_worker sending message failed: {e}"); - }); - } + false + } else { + true + } + }); } } }) @@ -215,7 +218,7 @@ where /// Removes specified items from the `already_notified_items` set. /// /// Intended to be called once transactions are finalized. - pub fn clean_notified_items(&self, items_to_be_removed: &Vec<I>) { + pub fn clean_notified_items(&self, items_to_be_removed: &[I]) { let mut already_notified_items = self.already_notified_items.write(); items_to_be_removed.iter().for_each(|i| { already_notified_items.remove(i); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index 88a45033a54caac905d759792bb9771e73bd110f..86ea27dcf4517bf4f0f6af55f1316d15ad0216e6 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -276,7 +276,7 @@ where /// and send the `Dropped` event to the listeners of these transactions. pub(super) async fn remove_dropped_transactions( &self, - to_be_removed: &Vec<ExtrinsicHash<ChainApi>>, + to_be_removed: &[ExtrinsicHash<ChainApi>], ) { log::debug!(target: LOG_TARGET, "remove_dropped_transactions count:{:?}", to_be_removed.len()); log_xt_trace!(target: LOG_TARGET, to_be_removed, "[{:?}] mempool::remove_dropped_transactions");