diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs index f246e2820ad94218a4b5ffefc1a79fc9c602fd05..2c711fa210e8bb8f84f5518f401f50c96fcdd9d8 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs @@ -136,7 +136,7 @@ where /// accordingly. /// /// If the event indicates that a transaction has been dropped and is no longer referenced by - /// any active views, the transaction hash is returned. Otherwise function returns `None`. + /// any active views, the transaction hash is returned. Otherwise `None` is returned. fn handle_event( &mut self, block_hash: BlockHash<C>, diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index 8c4ae9bd4de3892624bc775cf6e874d77abbabce..06c85dfc920a6dbcc8e66fdf0b4ac4f48c90859a 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -143,8 +143,6 @@ struct ExternalWatcherContext<ChainApi: graph::ChainApi> { /// A flag indicating if a `Ready` status has been encountered. ready_seen: bool, - /// A hash set of block hashes where the transaction is included. - in_block: HashSet<BlockHash<ChainApi>>, /// A hash set of block hashes from views that consider the transaction valid. views_keeping_tx_valid: HashSet<BlockHash<ChainApi>>, /// The pending events to be sent. @@ -170,7 +168,6 @@ where future_seen: false, ready_seen: false, views_keeping_tx_valid: Default::default(), - in_block: Default::default(), pending_events: Default::default(), } } @@ -179,8 +176,7 @@ where /// /// Function may set the context termination flag, which will close the stream. /// - /// Returns true if the event should be sent out, and false if the status update should be - /// skipped. + /// Returns `Some` with the `event` to forward or `None`. fn handle( &mut self, status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>, @@ -210,14 +206,12 @@ where } }, TransactionStatus::Broadcast(_) => None, - TransactionStatus::InBlock((block, _)) => + TransactionStatus::InBlock((..)) => if !(self.ready_seen || self.future_seen) { self.ready_seen = true; - self.in_block.insert(block); self.pending_events.push(status); Some(TransactionStatus::Ready) } else { - self.in_block.insert(block); Some(status) }, TransactionStatus::Retracted(_) => { @@ -327,44 +321,47 @@ where } loop { tokio::select! { - biased; - Some((view_hash, status)) = next_event(&mut ctx.status_stream_map) => { - if let Some(new_status) = ctx.handle(status, view_hash) { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: {new_status:?}", ctx.tx_hash); - return Some((new_status, ctx)) - } - }, - cmd = ctx.command_receiver.next() => { - log::trace!(target: LOG_TARGET, "[{:?}] select::rx views:{:?}", ctx.tx_hash, ctx.status_stream_map.keys().collect::<Vec<_>>()); - match cmd? { - ControllerCommand::AddViewStream(h,stream) => { - ctx.add_stream(h, stream); - }, - ControllerCommand::RemoveViewStream(h) => { - ctx.remove_view(h); - }, - ControllerCommand::TransactionInvalidated => { - if ctx.handle_invalidate_transaction() { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Invalid", ctx.tx_hash); - return Some((TransactionStatus::Invalid, ctx)) - } - }, - ControllerCommand::FinalizeTransaction(block, index) => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Finalized", ctx.tx_hash); - ctx.terminate = true; - return Some((TransactionStatus::Finalized((block, index)), ctx)) - }, - ControllerCommand::TransactionBroadcasted(peers) => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Broadcasted", ctx.tx_hash); - return Some((TransactionStatus::Broadcast(peers), ctx)) - }, - ControllerCommand::TransactionDropped => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Dropped", ctx.tx_hash); - ctx.terminate = true; - return Some((TransactionStatus::Dropped, ctx)) - }, - } - }, + biased; + Some((view_hash, status)) = next_event(&mut ctx.status_stream_map) => { + if let Some(new_status) = ctx.handle(status, view_hash) { + log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: {new_status:?}", ctx.tx_hash); + return Some((new_status, ctx)) + } + }, + cmd = ctx.command_receiver.next() => { + log::trace!(target: LOG_TARGET, "[{:?}] select::rx views:{:?}", + ctx.tx_hash, + ctx.status_stream_map.keys().collect::<Vec<_>>() + ); + match cmd? { + ControllerCommand::AddViewStream(h,stream) => { + ctx.add_stream(h, stream); + }, + ControllerCommand::RemoveViewStream(h) => { + ctx.remove_view(h); + }, + ControllerCommand::TransactionInvalidated => { + if ctx.handle_invalidate_transaction() { + log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Invalid", ctx.tx_hash); + return Some((TransactionStatus::Invalid, ctx)) + } + }, + ControllerCommand::FinalizeTransaction(block, index) => { + log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Finalized", ctx.tx_hash); + ctx.terminate = true; + return Some((TransactionStatus::Finalized((block, index)), ctx)) + }, + ControllerCommand::TransactionBroadcasted(peers) => { + log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Broadcasted", ctx.tx_hash); + return Some((TransactionStatus::Broadcast(peers), ctx)) + }, + ControllerCommand::TransactionDropped => { + log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Dropped", ctx.tx_hash); + ctx.terminate = true; + return Some((TransactionStatus::Dropped, ctx)) + }, + } + }, }; } }) @@ -447,7 +444,6 @@ where &self, propagated: HashMap<ExtrinsicHash<ChainApi>, Vec<String>>, ) { - // pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) { let mut controllers = self.controllers.write(); for (tx_hash, peers) in propagated { diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs index 2b3c4b24b0662d1f7d4d7e30c4f713c4884a91a9..9464ab3f5766706fb8e2d85a3219123332ba2d57 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs @@ -79,9 +79,9 @@ where }; match payload { WorkerPayload::RevalidateView(view, worker_channels) => - (*view).revalidate(worker_channels).await, + view.revalidate(worker_channels).await, WorkerPayload::RevalidateMempool(mempool, finalized_hash_and_number) => - (*mempool).revalidate(finalized_hash_and_number).await, + mempool.revalidate(finalized_hash_and_number).await, }; } } diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs index 24093c2198d95b7c77b9fdc15ec892aa89e5dde2..4c97ffa0cbbd9b546d901e5ac8e16c3d46882641 100644 --- a/substrate/client/transaction-pool/src/graph/listener.rs +++ b/substrate/client/transaction-pool/src/graph/listener.rs @@ -152,8 +152,6 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H /// Transaction was pruned from the pool. pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &H) { - // trace!(target: LOG_TARGET, "[{:?}] Pruned at {:?} {:#?}", tx, block_hash, - // std::backtrace::Backtrace::force_capture()); trace!(target: LOG_TARGET, "[{:?}] Pruned at {:?}", tx, block_hash); // Get the transactions included in the given block hash. let txs = self.finality_watchers.entry(block_hash).or_insert(vec![]); diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index 9f7b40dbceb4ceab86cc4236e4c1aaa84c6cfe2a..d7f55198a40a9ded48f8c7a94ffb481d781f8219 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -252,7 +252,7 @@ impl<B: ChainApi> ValidatedPool<B> { if ready_limit.is_exceeded(status.ready, status.ready_bytes) || future_limit.is_exceeded(status.future, status.future_bytes) { - log::warn!( + log::debug!( target: LOG_TARGET, "Enforcing limits ({}/{}kB ready, {}/{}kB future", ready_limit.count,