From 82761aee7d55fa46936fc3e1ea5fbfcd96d5a69a Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Thu, 10 Oct 2024 17:24:36 +0200 Subject: [PATCH] multi-view-listner: code review comments --- .../fork_aware_txpool/fork_aware_txpool.rs | 2 +- .../fork_aware_txpool/multi_view_listener.rs | 101 +++++++----------- .../src/fork_aware_txpool/tx_mem_pool.rs | 2 +- 3 files changed, 43 insertions(+), 62 deletions(-) diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index e7c7a775b04..55e27893a86 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -1160,7 +1160,7 @@ where for result in watched_results { match result { Err(tx_hash) => { - self.view_store.listener.invalidate_transactions(vec![tx_hash]); + self.view_store.listener.invalidate_transactions(&[tx_hash]); self.mempool.remove(tx_hash); }, Ok(_) => {}, diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index 06c85dfc920..789931d02b8 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -31,7 +31,7 @@ use sc_transaction_pool_api::{TransactionStatus, TransactionStatusStream, TxInde use sc_utils::mpsc; use sp_runtime::traits::Block as BlockT; use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, pin::Pin, }; use tokio_stream::StreamMap; @@ -380,13 +380,14 @@ where stream: TxStatusStream<ChainApi>, ) { let mut controllers = self.controllers.write(); - if let Some(tx) = controllers.get(&tx_hash) { - match tx.unbounded_send(ControllerCommand::AddViewStream(block_hash, stream)) { - Err(e) => { - trace!(target: LOG_TARGET, "[{:?}] add_view_watcher_for_tx: send message failed: {:?}", tx_hash, e); - controllers.remove(&tx_hash); - }, - Ok(_) => {}, + + if let Entry::Occupied(mut tx) = controllers.entry(tx_hash) { + if let Err(e) = tx + .get_mut() + .unbounded_send(ControllerCommand::AddViewStream(block_hash, stream)) + { + trace!(target: LOG_TARGET, "[{:?}] add_view_watcher_for_tx: send message failed: {:?}", tx_hash, e); + tx.remove(); } } } @@ -396,19 +397,14 @@ where /// This method sends a `RemoveViewStream` command to the controller of each transaction to /// remove the view's stream corresponding to the given block hash. pub(crate) fn remove_view(&self, block_hash: BlockHash<ChainApi>) { - let mut controllers = self.controllers.write(); - let mut invalid_controllers = Vec::new(); - for (tx_hash, sender) in controllers.iter() { - match sender.unbounded_send(ControllerCommand::RemoveViewStream(block_hash)) { - Err(e) => { + self.controllers.write().retain(|tx_hash, sender| { + sender + .unbounded_send(ControllerCommand::RemoveViewStream(block_hash)) + .map_err(|e| { log::trace!(target: LOG_TARGET, "[{:?}] remove_view: send message failed: {:?}", tx_hash, e); - invalid_controllers.push(*tx_hash); - }, - Ok(_) => {}, - } - } - invalid_controllers.into_iter().for_each(|tx_hash| { - controllers.remove(&tx_hash); + e + }) + .is_ok() }); } @@ -419,21 +415,19 @@ where /// /// The external event will be sent if no view is referencing the transaction as `Ready` or /// `Future`. - pub(crate) fn invalidate_transactions(&self, invalid_hashes: Vec<ExtrinsicHash<ChainApi>>) { + pub(crate) fn invalidate_transactions(&self, invalid_hashes: &[ExtrinsicHash<ChainApi>]) { let mut controllers = self.controllers.write(); - - for tx_hash in invalid_hashes { - if let Some(tx) = controllers.get(&tx_hash) { + invalid_hashes.iter().for_each(|tx_hash| { + if let Entry::Occupied(mut tx) = controllers.entry(*tx_hash) { trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction", tx_hash); - match tx.unbounded_send(ControllerCommand::TransactionInvalidated) { - Err(e) => { - trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e); - controllers.remove(&tx_hash); - }, - Ok(_) => {}, + if let Err(e) = + tx.get_mut().unbounded_send(ControllerCommand::TransactionInvalidated) + { + trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e); + tx.remove(); } } - } + }); } /// Send `Broadcasted` event to listeners of all transactions. @@ -445,38 +439,29 @@ where propagated: HashMap<ExtrinsicHash<ChainApi>, Vec<String>>, ) { let mut controllers = self.controllers.write(); - - for (tx_hash, peers) in propagated { - if let Some(tx) = controllers.get(&tx_hash) { + propagated.into_iter().for_each(|(tx_hash, peers)| { + if let Entry::Occupied(mut tx) = controllers.entry(tx_hash) { trace!(target: LOG_TARGET, "[{:?}] transaction_broadcasted", tx_hash); - match tx.unbounded_send(ControllerCommand::TransactionBroadcasted(peers)) { - Err(e) => { - trace!(target: LOG_TARGET, "[{:?}] transactions_broadcasted: send message failed: {:?}", tx_hash, e); - controllers.remove(&tx_hash); - }, - Ok(_) => {}, + if let Err(e) = tx.get_mut().unbounded_send(ControllerCommand::TransactionBroadcasted(peers)) { + trace!(target: LOG_TARGET, "[{:?}] transactions_broadcasted: send message failed: {:?}", tx_hash, e); + tx.remove(); } } - } + }); } /// Send `Dropped` event to listeners of transactions. /// /// This method sends a `TransactionDropped` command to the controller of each requested /// transaction prompting and external `Broadcasted` event. - pub(crate) fn transactions_dropped(&self, dropped: &Vec<ExtrinsicHash<ChainApi>>) { - // pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) { + pub(crate) fn transactions_dropped(&self, dropped: &[ExtrinsicHash<ChainApi>]) { let mut controllers = self.controllers.write(); - debug!(target: LOG_TARGET, "mvl::transactions_dropped: {:?}", dropped); for tx_hash in dropped { if let Some(tx) = controllers.remove(&tx_hash) { debug!(target: LOG_TARGET, "[{:?}] transaction_dropped", tx_hash); - match tx.unbounded_send(ControllerCommand::TransactionDropped) { - Err(e) => { - trace!(target: LOG_TARGET, "[{:?}] transactions_dropped: send message failed: {:?}", tx_hash, e); - }, - Ok(_) => {}, + if let Err(e) = tx.unbounded_send(ControllerCommand::TransactionDropped) { + trace!(target: LOG_TARGET, "[{:?}] transactions_dropped: send message failed: {:?}", tx_hash, e); }; } } @@ -492,21 +477,17 @@ where idx: TxIndex, ) { let mut controllers = self.controllers.write(); - if let Some(tx) = controllers.remove(&tx_hash) { trace!(target: LOG_TARGET, "[{:?}] finalize_transaction", tx_hash); - let result = tx.unbounded_send(ControllerCommand::FinalizeTransaction(block, idx)); - if let Err(e) = result { + if let Err(e) = tx.unbounded_send(ControllerCommand::FinalizeTransaction(block, idx)) { trace!(target: LOG_TARGET, "[{:?}] finalize_transaction: send message failed: {:?}", tx_hash, e); - controllers.remove(&tx_hash); } }; } /// Removes stale controllers. pub(crate) fn remove_stale_controllers(&self) { - let mut controllers = self.controllers.write(); - controllers.retain(|_, c| !c.is_closed()); + self.controllers.write().retain(|_, c| !c.is_closed()); } } @@ -613,7 +594,7 @@ mod tests { listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); listener.add_view_watcher_for_tx(tx_hash, block_hash1, view_stream1.boxed()); - listener.invalidate_transactions(vec![tx_hash]); + listener.invalidate_transactions(&[tx_hash]); let out = handle.await.unwrap(); log::debug!("out: {:#?}", out); @@ -664,8 +645,8 @@ mod tests { listener.add_view_watcher_for_tx(tx1_hash, block_hash0, view0_tx1_stream.boxed()); listener.add_view_watcher_for_tx(tx1_hash, block_hash1, view1_tx1_stream.boxed()); - listener.invalidate_transactions(vec![tx0_hash]); - listener.invalidate_transactions(vec![tx1_hash]); + listener.invalidate_transactions(&[tx0_hash]); + listener.invalidate_transactions(&[tx1_hash]); let out_tx0 = handle0.await.unwrap(); let out_tx1 = handle1.await.unwrap(); @@ -720,7 +701,7 @@ mod tests { listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); listener.add_view_watcher_for_tx(tx_hash, block_hash1, view_stream1.boxed()); - listener.invalidate_transactions(vec![tx_hash]); + listener.invalidate_transactions(&[tx_hash]); let out = handle.await.unwrap(); log::debug!("out: {:#?}", out); @@ -751,7 +732,7 @@ mod tests { // Note: this generates actual Invalid event. // Invalid event from View's stream is intentionally ignored. - listener.invalidate_transactions(vec![tx_hash]); + listener.invalidate_transactions(&[tx_hash]); listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index 63028216147..88a45033a54 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -410,7 +410,7 @@ where invalid_hashes.iter().for_each(|i| { transactions.remove(i); }); - self.listener.invalidate_transactions(invalid_hashes); + self.listener.invalidate_transactions(&invalid_hashes); } } -- GitLab