diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index e7c7a775b04ad3b82df48089a42689dbe21ae485..55e27893a863723462d5fc56e7b5b032dd21a1e5 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -1160,7 +1160,7 @@ where for result in watched_results { match result { Err(tx_hash) => { - self.view_store.listener.invalidate_transactions(vec![tx_hash]); + self.view_store.listener.invalidate_transactions(&[tx_hash]); self.mempool.remove(tx_hash); }, Ok(_) => {}, diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index 06c85dfc920a6dbcc8e66fdf0b4ac4f48c90859a..789931d02b8a29b72b5d3a273e375a9f44a21ed4 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -31,7 +31,7 @@ use sc_transaction_pool_api::{TransactionStatus, TransactionStatusStream, TxInde use sc_utils::mpsc; use sp_runtime::traits::Block as BlockT; use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, pin::Pin, }; use tokio_stream::StreamMap; @@ -380,13 +380,14 @@ where stream: TxStatusStream<ChainApi>, ) { let mut controllers = self.controllers.write(); - if let Some(tx) = controllers.get(&tx_hash) { - match tx.unbounded_send(ControllerCommand::AddViewStream(block_hash, stream)) { - Err(e) => { - trace!(target: LOG_TARGET, "[{:?}] add_view_watcher_for_tx: send message failed: {:?}", tx_hash, e); - controllers.remove(&tx_hash); - }, - Ok(_) => {}, + + if let Entry::Occupied(mut tx) = controllers.entry(tx_hash) { + if let Err(e) = tx + .get_mut() + .unbounded_send(ControllerCommand::AddViewStream(block_hash, stream)) + { + trace!(target: LOG_TARGET, "[{:?}] add_view_watcher_for_tx: send message failed: {:?}", tx_hash, e); + tx.remove(); } } } @@ -396,19 +397,14 @@ where /// This method sends a `RemoveViewStream` command to the controller of each transaction to /// remove the view's stream corresponding to the given block hash. pub(crate) fn remove_view(&self, block_hash: BlockHash<ChainApi>) { - let mut controllers = self.controllers.write(); - let mut invalid_controllers = Vec::new(); - for (tx_hash, sender) in controllers.iter() { - match sender.unbounded_send(ControllerCommand::RemoveViewStream(block_hash)) { - Err(e) => { + self.controllers.write().retain(|tx_hash, sender| { + sender + .unbounded_send(ControllerCommand::RemoveViewStream(block_hash)) + .map_err(|e| { log::trace!(target: LOG_TARGET, "[{:?}] remove_view: send message failed: {:?}", tx_hash, e); - invalid_controllers.push(*tx_hash); - }, - Ok(_) => {}, - } - } - invalid_controllers.into_iter().for_each(|tx_hash| { - controllers.remove(&tx_hash); + e + }) + .is_ok() }); } @@ -419,21 +415,19 @@ where /// /// The external event will be sent if no view is referencing the transaction as `Ready` or /// `Future`. - pub(crate) fn invalidate_transactions(&self, invalid_hashes: Vec<ExtrinsicHash<ChainApi>>) { + pub(crate) fn invalidate_transactions(&self, invalid_hashes: &[ExtrinsicHash<ChainApi>]) { let mut controllers = self.controllers.write(); - - for tx_hash in invalid_hashes { - if let Some(tx) = controllers.get(&tx_hash) { + invalid_hashes.iter().for_each(|tx_hash| { + if let Entry::Occupied(mut tx) = controllers.entry(*tx_hash) { trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction", tx_hash); - match tx.unbounded_send(ControllerCommand::TransactionInvalidated) { - Err(e) => { - trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e); - controllers.remove(&tx_hash); - }, - Ok(_) => {}, + if let Err(e) = + tx.get_mut().unbounded_send(ControllerCommand::TransactionInvalidated) + { + trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e); + tx.remove(); } } - } + }); } /// Send `Broadcasted` event to listeners of all transactions. @@ -445,38 +439,29 @@ where propagated: HashMap<ExtrinsicHash<ChainApi>, Vec<String>>, ) { let mut controllers = self.controllers.write(); - - for (tx_hash, peers) in propagated { - if let Some(tx) = controllers.get(&tx_hash) { + propagated.into_iter().for_each(|(tx_hash, peers)| { + if let Entry::Occupied(mut tx) = controllers.entry(tx_hash) { trace!(target: LOG_TARGET, "[{:?}] transaction_broadcasted", tx_hash); - match tx.unbounded_send(ControllerCommand::TransactionBroadcasted(peers)) { - Err(e) => { - trace!(target: LOG_TARGET, "[{:?}] transactions_broadcasted: send message failed: {:?}", tx_hash, e); - controllers.remove(&tx_hash); - }, - Ok(_) => {}, + if let Err(e) = tx.get_mut().unbounded_send(ControllerCommand::TransactionBroadcasted(peers)) { + trace!(target: LOG_TARGET, "[{:?}] transactions_broadcasted: send message failed: {:?}", tx_hash, e); + tx.remove(); } } - } + }); } /// Send `Dropped` event to listeners of transactions. /// /// This method sends a `TransactionDropped` command to the controller of each requested /// transaction prompting and external `Broadcasted` event. - pub(crate) fn transactions_dropped(&self, dropped: &Vec<ExtrinsicHash<ChainApi>>) { - // pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) { + pub(crate) fn transactions_dropped(&self, dropped: &[ExtrinsicHash<ChainApi>]) { let mut controllers = self.controllers.write(); - debug!(target: LOG_TARGET, "mvl::transactions_dropped: {:?}", dropped); for tx_hash in dropped { if let Some(tx) = controllers.remove(&tx_hash) { debug!(target: LOG_TARGET, "[{:?}] transaction_dropped", tx_hash); - match tx.unbounded_send(ControllerCommand::TransactionDropped) { - Err(e) => { - trace!(target: LOG_TARGET, "[{:?}] transactions_dropped: send message failed: {:?}", tx_hash, e); - }, - Ok(_) => {}, + if let Err(e) = tx.unbounded_send(ControllerCommand::TransactionDropped) { + trace!(target: LOG_TARGET, "[{:?}] transactions_dropped: send message failed: {:?}", tx_hash, e); }; } } @@ -492,21 +477,17 @@ where idx: TxIndex, ) { let mut controllers = self.controllers.write(); - if let Some(tx) = controllers.remove(&tx_hash) { trace!(target: LOG_TARGET, "[{:?}] finalize_transaction", tx_hash); - let result = tx.unbounded_send(ControllerCommand::FinalizeTransaction(block, idx)); - if let Err(e) = result { + if let Err(e) = tx.unbounded_send(ControllerCommand::FinalizeTransaction(block, idx)) { trace!(target: LOG_TARGET, "[{:?}] finalize_transaction: send message failed: {:?}", tx_hash, e); - controllers.remove(&tx_hash); } }; } /// Removes stale controllers. pub(crate) fn remove_stale_controllers(&self) { - let mut controllers = self.controllers.write(); - controllers.retain(|_, c| !c.is_closed()); + self.controllers.write().retain(|_, c| !c.is_closed()); } } @@ -613,7 +594,7 @@ mod tests { listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); listener.add_view_watcher_for_tx(tx_hash, block_hash1, view_stream1.boxed()); - listener.invalidate_transactions(vec![tx_hash]); + listener.invalidate_transactions(&[tx_hash]); let out = handle.await.unwrap(); log::debug!("out: {:#?}", out); @@ -664,8 +645,8 @@ mod tests { listener.add_view_watcher_for_tx(tx1_hash, block_hash0, view0_tx1_stream.boxed()); listener.add_view_watcher_for_tx(tx1_hash, block_hash1, view1_tx1_stream.boxed()); - listener.invalidate_transactions(vec![tx0_hash]); - listener.invalidate_transactions(vec![tx1_hash]); + listener.invalidate_transactions(&[tx0_hash]); + listener.invalidate_transactions(&[tx1_hash]); let out_tx0 = handle0.await.unwrap(); let out_tx1 = handle1.await.unwrap(); @@ -720,7 +701,7 @@ mod tests { listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); listener.add_view_watcher_for_tx(tx_hash, block_hash1, view_stream1.boxed()); - listener.invalidate_transactions(vec![tx_hash]); + listener.invalidate_transactions(&[tx_hash]); let out = handle.await.unwrap(); log::debug!("out: {:#?}", out); @@ -751,7 +732,7 @@ mod tests { // Note: this generates actual Invalid event. // Invalid event from View's stream is intentionally ignored. - listener.invalidate_transactions(vec![tx_hash]); + listener.invalidate_transactions(&[tx_hash]); listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index 63028216147fab0202c638311af7a3d5a4488c0a..88a45033a54caac905d759792bb9771e73bd110f 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -410,7 +410,7 @@ where invalid_hashes.iter().for_each(|i| { transactions.remove(i); }); - self.listener.invalidate_transactions(invalid_hashes); + self.listener.invalidate_transactions(&invalid_hashes); } }