From c546e08c5a4b302cf86d51425c370588e59f89ed Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Fri, 14 Feb 2025 18:30:00 +0100 Subject: [PATCH] `txpool api`: `remove_invalid` call improved (#6661) #### Description Currently the transaction which is reported as invalid by a block builder (or `removed_invalid` by other components) is silently skipped. This PR improves this behavior. The transaction pool `report_invalid` function now accepts optional error associated with every reported transaction, and also the optional block hash which provides hints how reported transaction shall be handled. The following API change is proposed: https://github.com/paritytech/polkadot-sdk/blob/8be5ef3e9a18e873de78aca1b8f834fa554ce9c8/substrate/client/transaction-pool/api/src/lib.rs#L297-L318 Depending on error, the transaction pool can decide if transaction shall be removed from the view only or entirely from the pool. Invalid event will be dispatched if required. #### Notes for reviewers - Actual logic of removing invalid txs is implented in [`ViewStore::report_invalid`](https://github.com/paritytech/polkadot-sdk/blob/0fad26c43a65bfb371d667278981d3c68c3ce9d6/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs#L657-L680). Method's doc explains the flow. - This PR changes `HashMap` to `IndexMap` in revalidation logic. This is to preserve the original order of transactions (mainly for purposes of unit tests). - This PR solves the problem mentioned in: https://github.com/paritytech/polkadot-sdk/issues/5477#issuecomment-2598809344 (which can now be resolved). The invalid transactions found during mempool revalidation are now also removed from the `view_store`. No dangling invalid transaction shall be left in the pool. (https://github.com/paritytech/polkadot-sdk/pull/6661/commits/bfec26253219044adaf6cdb3fff542c12460ed5a) - The support for dropping invalid transactions reported from the views was also added. This should never happen, but if for any case all views will report invalid transcation (which previously was valid) the transaction will be dropped from the pool (https://github.com/paritytech/polkadot-sdk/pull/6661/commits/48214a381438f9b78653b8995bb4e62df9da504a). fixes: #6008, #5477 --------- Co-authored-by: command-bot <> Co-authored-by: Sebastian Kunert <skunert49@gmail.com> --- Cargo.lock | 2 + prdoc/pr_6661.prdoc | 17 + substrate/bin/node/bench/Cargo.toml | 1 + substrate/bin/node/bench/src/construct.rs | 8 +- .../basic-authorship/src/basic_authorship.rs | 14 +- .../src/transaction/tests/middleware_pool.rs | 10 +- .../src/transaction/transaction_broadcast.rs | 2 +- substrate/client/rpc/src/author/mod.rs | 16 +- .../client/transaction-pool/api/Cargo.toml | 1 + .../client/transaction-pool/api/src/lib.rs | 28 +- .../src/fork_aware_txpool/dropped_watcher.rs | 27 +- .../fork_aware_txpool/fork_aware_txpool.rs | 94 ++- .../src/fork_aware_txpool/metrics.rs | 16 +- .../fork_aware_txpool/multi_view_listener.rs | 89 +-- .../fork_aware_txpool/revalidation_worker.rs | 22 +- .../src/fork_aware_txpool/tx_mem_pool.rs | 80 +- .../src/fork_aware_txpool/view.rs | 22 +- .../src/fork_aware_txpool/view_store.rs | 86 ++- .../transaction-pool/src/graph/listener.rs | 5 +- .../client/transaction-pool/src/graph/pool.rs | 3 +- .../src/graph/validated_pool.rs | 45 +- .../src/single_state_txpool/revalidation.rs | 14 +- .../single_state_txpool.rs | 20 +- .../src/transaction_pool_wrapper.rs | 10 +- .../client/transaction-pool/tests/fatp.rs | 365 +-------- .../transaction-pool/tests/fatp_invalid.rs | 690 ++++++++++++++++++ 26 files changed, 1117 insertions(+), 570 deletions(-) create mode 100644 prdoc/pr_6661.prdoc create mode 100644 substrate/client/transaction-pool/tests/fatp_invalid.rs diff --git a/Cargo.lock b/Cargo.lock index 2f6b374985d..cd323584c9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11358,6 +11358,7 @@ dependencies = [ "sc-transaction-pool-api", "serde", "serde_json", + "sp-blockchain", "sp-consensus", "sp-core 28.0.0", "sp-inherents 26.0.0", @@ -23865,6 +23866,7 @@ version = "28.0.0" dependencies = [ "async-trait", "futures", + "indexmap 2.7.0", "log", "parity-scale-codec", "serde", diff --git a/prdoc/pr_6661.prdoc b/prdoc/pr_6661.prdoc new file mode 100644 index 00000000000..2e39646fede --- /dev/null +++ b/prdoc/pr_6661.prdoc @@ -0,0 +1,17 @@ +title: '`txpool api`: `remove_invalid` call improved' +doc: +- audience: Node Dev + description: |- + Currently the transaction which is reported as invalid by a block builder (or `removed_invalid` by other components) is silently skipped. This PR improves this behavior. The transaction pool `report_invalid` function now accepts optional error associated with every reported transaction, and also the optional block hash which both provide hints how reported invalid transaction shall be handled. Depending on error, the transaction pool can decide if transaction shall be removed from the view only or entirely from the pool. Invalid event will be dispatched if required. + +crates: +- name: sc-transaction-pool-api + bump: minor +- name: sc-transaction-pool + bump: minor +- name: sc-rpc-spec-v2 + bump: minor +- name: sc-rpc + bump: minor +- name: sc-basic-authorship + bump: minor diff --git a/substrate/bin/node/bench/Cargo.toml b/substrate/bin/node/bench/Cargo.toml index 83f7b82cd2b..b5f94060bb6 100644 --- a/substrate/bin/node/bench/Cargo.toml +++ b/substrate/bin/node/bench/Cargo.toml @@ -36,6 +36,7 @@ sc-transaction-pool = { workspace = true, default-features = true } sc-transaction-pool-api = { workspace = true, default-features = true } serde = { workspace = true, default-features = true } serde_json = { workspace = true, default-features = true } +sp-blockchain = { workspace = true, default-features = true } sp-consensus = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } sp-inherents = { workspace = true, default-features = true } diff --git a/substrate/bin/node/bench/src/construct.rs b/substrate/bin/node/bench/src/construct.rs index 22129c6a1d6..9049732d6d3 100644 --- a/substrate/bin/node/bench/src/construct.rs +++ b/substrate/bin/node/bench/src/construct.rs @@ -31,7 +31,7 @@ use node_primitives::Block; use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes}; use sc_transaction_pool_api::{ ImportNotificationStream, PoolStatus, ReadyTransactions, TransactionFor, TransactionSource, - TransactionStatusStreamFor, TxHash, + TransactionStatusStreamFor, TxHash, TxInvalidityReportMap, }; use sp_consensus::{Environment, Proposer}; use sp_inherents::InherentDataProvider; @@ -271,7 +271,11 @@ impl sc_transaction_pool_api::TransactionPool for Transactions { unimplemented!() } - fn remove_invalid(&self, _hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> { + fn report_invalid( + &self, + _at: Option<Self::Hash>, + _invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>, + ) -> Vec<Arc<Self::InPoolTransaction>> { Default::default() } diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs index 2096af1c25b..b3519f47a15 100644 --- a/substrate/client/basic-authorship/src/basic_authorship.rs +++ b/substrate/client/basic-authorship/src/basic_authorship.rs @@ -29,7 +29,7 @@ use futures::{ use log::{debug, error, info, trace, warn}; use sc_block_builder::{BlockBuilderApi, BlockBuilderBuilder}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO}; -use sc_transaction_pool_api::{InPoolTransaction, TransactionPool}; +use sc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxInvalidityReportMap}; use sp_api::{ApiExt, CallApiAt, ProvideRuntimeApi}; use sp_blockchain::{ApplyExtrinsicFailed::Validity, Error::ApplyExtrinsicFailed, HeaderBackend}; use sp_consensus::{DisableProofRecording, EnableProofRecording, ProofRecording, Proposal}; @@ -413,7 +413,7 @@ where let soft_deadline = now + time::Duration::from_micros(self.soft_deadline_percent.mul_floor(left_micros)); let mut skipped = 0; - let mut unqueue_invalid = Vec::new(); + let mut unqueue_invalid = TxInvalidityReportMap::new(); let delay = deadline.saturating_duration_since((self.now)()) / 8; let mut pending_iterator = @@ -512,7 +512,13 @@ where target: LOG_TARGET, "[{:?}] Invalid transaction: {} at: {}", pending_tx_hash, e, self.parent_hash ); - unqueue_invalid.push(pending_tx_hash); + + let error_to_report = match e { + ApplyExtrinsicFailed(Validity(e)) => Some(e), + _ => None, + }; + + unqueue_invalid.insert(pending_tx_hash, error_to_report); }, } }; @@ -524,7 +530,7 @@ where ); } - self.transaction_pool.remove_invalid(&unqueue_invalid); + self.transaction_pool.report_invalid(Some(self.parent_hash), unqueue_invalid); Ok(end_reason) } diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs index a543969a89b..b06201564c2 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs @@ -21,7 +21,7 @@ use codec::Encode; use sc_transaction_pool::BasicPool; use sc_transaction_pool_api::{ ImportNotificationStream, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool, - TransactionSource, TransactionStatusStreamFor, TxHash, + TransactionSource, TransactionStatusStreamFor, TxHash, TxInvalidityReportMap, }; use crate::hex_string; @@ -137,8 +137,12 @@ impl TransactionPool for MiddlewarePool { Ok(watcher.boxed()) } - fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> { - self.inner_pool.remove_invalid(hashes) + fn report_invalid( + &self, + at: Option<<Self::Block as BlockT>::Hash>, + invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>, + ) -> Vec<Arc<Self::InPoolTransaction>> { + self.inner_pool.report_invalid(at, invalid_tx_errors) } fn status(&self) -> PoolStatus { diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs index 2fd4ce24545..66b0a06bfe3 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -228,7 +228,7 @@ where } // Best effort pool removal (tx can already be finalized). - pool.remove_invalid(&[broadcast_state.tx_hash]); + pool.report_invalid(None, [(broadcast_state.tx_hash, None)].into()); }); // Keep track of this entry and the abortable handle. diff --git a/substrate/client/rpc/src/author/mod.rs b/substrate/client/rpc/src/author/mod.rs index 6afc871e565..0c99da106ba 100644 --- a/substrate/client/rpc/src/author/mod.rs +++ b/substrate/client/rpc/src/author/mod.rs @@ -21,19 +21,17 @@ #[cfg(test)] mod tests; -use std::sync::Arc; - +use self::error::{Error, Result}; use crate::{ utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription}, SubscriptionTaskExecutor, }; - use codec::{Decode, Encode}; use jsonrpsee::{core::async_trait, types::ErrorObject, Extensions, PendingSubscriptionSink}; use sc_rpc_api::check_if_safe; use sc_transaction_pool_api::{ error::IntoPoolError, BlockHash, InPoolTransaction, TransactionFor, TransactionPool, - TransactionSource, TxHash, + TransactionSource, TxHash, TxInvalidityReportMap, }; use sp_api::{ApiExt, ProvideRuntimeApi}; use sp_blockchain::HeaderBackend; @@ -41,8 +39,8 @@ use sp_core::Bytes; use sp_keystore::{KeystoreExt, KeystorePtr}; use sp_runtime::traits::Block as BlockT; use sp_session::SessionKeys; +use std::sync::Arc; -use self::error::{Error, Result}; /// Re-export the API for backward compatibility. pub use sc_rpc_api::author::*; @@ -164,17 +162,17 @@ where let hashes = bytes_or_hash .into_iter() .map(|x| match x { - hash::ExtrinsicOrHash::Hash(h) => Ok(h), + hash::ExtrinsicOrHash::Hash(h) => Ok((h, None)), hash::ExtrinsicOrHash::Extrinsic(bytes) => { let xt = Decode::decode(&mut &bytes[..])?; - Ok(self.pool.hash_of(&xt)) + Ok((self.pool.hash_of(&xt), None)) }, }) - .collect::<Result<Vec<_>>>()?; + .collect::<Result<TxInvalidityReportMap<TxHash<P>>>>()?; Ok(self .pool - .remove_invalid(&hashes) + .report_invalid(None, hashes) .into_iter() .map(|tx| tx.hash().clone()) .collect()) diff --git a/substrate/client/transaction-pool/api/Cargo.toml b/substrate/client/transaction-pool/api/Cargo.toml index 6671492a4e9..d3ea499beec 100644 --- a/substrate/client/transaction-pool/api/Cargo.toml +++ b/substrate/client/transaction-pool/api/Cargo.toml @@ -15,6 +15,7 @@ workspace = true async-trait = { workspace = true } codec = { workspace = true, default-features = true } futures = { workspace = true } +indexmap = { workspace = true } log = { workspace = true, default-features = true } serde = { features = ["derive"], workspace = true, default-features = true } sp-blockchain = { workspace = true, default-features = true } diff --git a/substrate/client/transaction-pool/api/src/lib.rs b/substrate/client/transaction-pool/api/src/lib.rs index 6f771e9479b..2bbcc6035f4 100644 --- a/substrate/client/transaction-pool/api/src/lib.rs +++ b/substrate/client/transaction-pool/api/src/lib.rs @@ -33,6 +33,7 @@ const LOG_TARGET: &str = "txpool::api"; pub use sp_runtime::transaction_validity::{ TransactionLongevity, TransactionPriority, TransactionSource, TransactionTag, + TransactionValidityError, }; /// Transaction pool status. @@ -207,6 +208,9 @@ pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, Bloc pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic; /// Transaction's index within the block in which it was included. pub type TxIndex = usize; +/// Map containing validity errors associated with transaction hashes. Used to report invalid +/// transactions to the pool. +pub type TxInvalidityReportMap<H> = indexmap::IndexMap<H, Option<TransactionValidityError>>; /// In-pool transaction interface. /// @@ -290,8 +294,28 @@ pub trait TransactionPool: Send + Sync { fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>; // *** Block production - /// Remove transactions identified by given hashes (and dependent transactions) from the pool. - fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>>; + /// Reports invalid transactions to the transaction pool. + /// + /// This function takes a map where the key is a transaction hash and the value is an + /// optional error encountered during the transaction execution, possibly within a specific + /// block. + /// + /// The transaction pool implementation decides which transactions to remove. Transactions + /// removed from the pool will be notified with `TransactionStatus::Invalid` event (if + /// `submit_and_watch` was used for submission). + /// + /// If the error associated to transaction is `None`, the transaction will be forcibly removed + /// from the pool. + /// + /// The optional `at` parameter provides additional context regarding the block where the error + /// occurred. + /// + /// Function returns the transactions actually removed from the pool. + fn report_invalid( + &self, + at: Option<<Self::Block as BlockT>::Hash>, + invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>, + ) -> Vec<Arc<Self::InPoolTransaction>>; // *** logging /// Get futures transaction list. diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs index be20a160896..e04c826a1d5 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs @@ -62,6 +62,11 @@ impl<Hash> DroppedTransaction<Hash> { pub fn new_enforced_by_limts(tx_hash: Hash) -> Self { Self { reason: DroppedReason::LimitsEnforced, tx_hash } } + + /// Creates a new instance with reason set to `DroppedReason::Invalid`. + pub fn new_invalid(tx_hash: Hash) -> Self { + Self { reason: DroppedReason::Invalid, tx_hash } + } } /// Provides reason of why transactions was dropped. @@ -71,6 +76,8 @@ pub enum DroppedReason<Hash> { Usurped(Hash), /// Transaction was dropped because of internal pool limits being enforced. LimitsEnforced, + /// Transaction was dropped because of being invalid. + Invalid, } /// Dropped-logic related event from the single view. @@ -255,7 +262,12 @@ where let (tx_hash, status) = event; match status { TransactionStatus::Future => { - self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash); + // see note below: + if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) { + views_keeping_tx_valid.get_mut().insert(block_hash); + } else { + self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash); + } }, TransactionStatus::Ready | TransactionStatus::InBlock(..) => { // note: if future transaction was once seen as the ready we may want to treat it @@ -279,12 +291,23 @@ where return Some(DroppedTransaction::new_enforced_by_limts(tx_hash)) } } else { - debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked) tx"); + debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked dropped) tx"); return Some(DroppedTransaction::new_enforced_by_limts(tx_hash)) } }, TransactionStatus::Usurped(by) => return Some(DroppedTransaction::new_usurped(tx_hash, by)), + TransactionStatus::Invalid => { + if let Some(mut views_keeping_tx_valid) = self.transaction_views(tx_hash) { + views_keeping_tx_valid.get_mut().remove(&block_hash); + if views_keeping_tx_valid.get().is_empty() { + return Some(DroppedTransaction::new_invalid(tx_hash)) + } + } else { + debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked invalid) tx"); + return Some(DroppedTransaction::new_invalid(tx_hash)) + } + }, _ => {}, }; None diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 6195cf53b60..5b43d900848 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -54,7 +54,7 @@ use prometheus_endpoint::Registry as PrometheusRegistry; use sc_transaction_pool_api::{ error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionPriority, - TransactionSource, TransactionStatusStreamFor, TxHash, + TransactionSource, TransactionStatusStreamFor, TxHash, TxInvalidityReportMap, }; use sp_blockchain::{HashAndNumber, TreeRoute}; use sp_core::traits::SpawnEssentialNamed; @@ -293,14 +293,16 @@ where tx_hash = ?new_tx_hash, "error: dropped_monitor_task: no entry in mempool for new transaction" ); - } + }; + }, + DroppedReason::LimitsEnforced | DroppedReason::Invalid => { + view_store.remove_transaction_subtree(tx_hash, |_, _| {}); }, - DroppedReason::LimitsEnforced => {}, }; - mempool.remove_transaction(&tx_hash); - view_store.listener.transaction_dropped(dropped); + mempool.remove_transactions(&[tx_hash]); import_notification_sink.clean_notified_items(&[tx_hash]); + view_store.listener.transaction_dropped(dropped); } } @@ -763,26 +765,24 @@ where // // Finally, it collects the hashes of updated transactions or submission errors (either // from the mempool or view_store) into a returned vector. + const RESULTS_ASSUMPTION : &str = + "The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed."; Ok(mempool_results - .into_iter() - .map(|result| { - result - .map_err(Into::into) - .and_then(|insertion| { - submission_results - .next() - .expect("The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.") - .inspect_err(|_|{ - mempool.remove_transaction(&insertion.hash); - }) + .into_iter() + .map(|result| { + result.map_err(Into::into).and_then(|insertion| { + submission_results.next().expect(RESULTS_ASSUMPTION).inspect_err(|_| { + mempool.remove_transactions(&[insertion.hash]); }) - }) - .map(|r| r.map(|r| { + }) + .map(|r| { + r.map(|r| { mempool.update_transaction_priority(&r); r.hash() - })) - .collect::<Vec<_>>()) + }) + }) + .collect::<Vec<_>>()) } /// Submits a single transaction and returns a future resolving to the submission results. @@ -839,7 +839,7 @@ where .submit_and_watch(at, insertion.source, xt) .await .inspect_err(|_| { - self.mempool.remove_transaction(&insertion.hash); + self.mempool.remove_transactions(&[insertion.hash]); }) .map(|mut outcome| { self.mempool.update_transaction_priority(&outcome); @@ -847,18 +847,34 @@ where }) } - /// Intended to remove transactions identified by the given hashes, and any dependent - /// transactions, from the pool. In current implementation this function only outputs the error. - /// Seems that API change is needed here to make this call reasonable. - // todo [#5491]: api change? we need block hash here (assuming we need it at all - could be - // useful for verification for debugging purposes). - fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> { - if !hashes.is_empty() { - log_xt_trace!(target:LOG_TARGET, hashes, "fatp::remove_invalid"); - self.metrics - .report(|metrics| metrics.removed_invalid_txs.inc_by(hashes.len() as _)); - } - Default::default() + /// Reports invalid transactions to the transaction pool. + /// + /// This function takes an array of tuples, each consisting of a transaction hash and the + /// corresponding error that occurred during transaction execution at given block. + /// + /// The transaction pool implementation will determine which transactions should be + /// removed from the pool. Transactions that depend on invalid transactions will also + /// be removed. + fn report_invalid( + &self, + at: Option<<Self::Block as BlockT>::Hash>, + invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>, + ) -> Vec<Arc<Self::InPoolTransaction>> { + debug!(target: LOG_TARGET, len = ?invalid_tx_errors.len(), "fatp::report_invalid"); + log_xt_trace!(data: tuple, target:LOG_TARGET, invalid_tx_errors.iter(), "fatp::report_invalid {:?}"); + self.metrics + .report(|metrics| metrics.reported_invalid_txs.inc_by(invalid_tx_errors.len() as _)); + + let removed = self.view_store.report_invalid(at, invalid_tx_errors); + + let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>(); + self.mempool.remove_transactions(&removed_hashes); + self.import_notification_sink.clean_notified_items(&removed_hashes); + + self.metrics + .report(|metrics| metrics.removed_invalid_txs.inc_by(removed_hashes.len() as _)); + + removed } // todo [#5491]: api change? @@ -987,7 +1003,7 @@ where self.view_store .submit_local(xt) .inspect_err(|_| { - self.mempool.remove_transaction(&insertion.hash); + self.mempool.remove_transactions(&[insertion.hash]); }) .map(|outcome| { self.mempool.update_transaction_priority(&outcome); @@ -1245,7 +1261,7 @@ where for result in results { if let Err(tx_hash) = result { self.view_store.listener.transactions_invalidated(&[tx_hash]); - self.mempool.remove_transaction(&tx_hash); + self.mempool.remove_transactions(&[tx_hash]); } } } @@ -1382,6 +1398,7 @@ where self.revalidation_queue .revalidate_mempool( self.mempool.clone(), + self.view_store.clone(), HashAndNumber { hash: finalized_hash, number: finalized_number }, ) .await; @@ -1487,7 +1504,12 @@ where self.mempool.try_insert_with_replacement(xt, priority, source, watched)?; for worst_hash in &insertion_info.removed { - log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}"); + trace!( + target: LOG_TARGET, + tx_hash = ?worst_hash, + new_tx_hash = ?tx_hash, + "removed: replaced by" + ); self.view_store .listener .transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash)); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs index c04741e1c1d..8ddc9f588cc 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs @@ -29,6 +29,8 @@ use prometheus_endpoint::{ exponential_buckets, histogram_opts, linear_buckets, register, Counter, Gauge, Histogram, PrometheusError, Registry, U64, }; +#[cfg(doc)] +use sc_transaction_pool_api::TransactionPool; use sc_transaction_pool_api::TransactionStatus; use sc_utils::mpsc; use std::{ @@ -55,6 +57,11 @@ pub struct Metrics { /// Total number of unwatched transactions in txpool. pub unwatched_txs: Gauge<U64>, /// Total number of transactions reported as invalid. + /// + /// This only includes transaction reported as invalid by the + /// [`TransactionPool::report_invalid`] method. + pub reported_invalid_txs: Counter<U64>, + /// Total number of transactions removed as invalid. pub removed_invalid_txs: Counter<U64>, /// Total number of transactions from imported blocks that are unknown to the pool. pub unknown_from_block_import_txs: Counter<U64>, @@ -255,10 +262,17 @@ impl MetricsRegistrant for Metrics { )?, registry, )?, + reported_invalid_txs: register( + Counter::new( + "substrate_sub_txpool_reported_invalid_txs_total", + "Total number of transactions reported as invalid by external entities using TxPool API.", + )?, + registry, + )?, removed_invalid_txs: register( Counter::new( "substrate_sub_txpool_removed_invalid_txs_total", - "Total number of transactions reported as invalid.", + "Total number of transactions removed as invalid.", )?, registry, )?, diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index 959df2ffe97..62c4320e5d3 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -132,6 +132,8 @@ where TransactionStatus::Usurped(*by), TransactionStatusUpdate::TransactionDropped(_, DroppedReason::LimitsEnforced) => TransactionStatus::Dropped, + TransactionStatusUpdate::TransactionDropped(_, DroppedReason::Invalid) => + TransactionStatus::Invalid, } } } @@ -311,33 +313,9 @@ where &mut self, request: TransactionStatusUpdate<ChainApi>, ) -> Option<TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>> { - match request { - TransactionStatusUpdate::TransactionInvalidated(..) => - if self.handle_invalidate_transaction() { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Invalid", self.tx_hash); - return Some(TransactionStatus::Invalid) - }, - TransactionStatusUpdate::TransactionFinalized(_, block, index) => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Finalized", self.tx_hash); - self.terminate = true; - return Some(TransactionStatus::Finalized((block, index))) - }, - TransactionStatusUpdate::TransactionBroadcasted(_, peers) => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Broadcasted", self.tx_hash); - return Some(TransactionStatus::Broadcast(peers)) - }, - TransactionStatusUpdate::TransactionDropped(_, DroppedReason::LimitsEnforced) => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Dropped", self.tx_hash); - self.terminate = true; - return Some(TransactionStatus::Dropped) - }, - TransactionStatusUpdate::TransactionDropped(_, DroppedReason::Usurped(by)) => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Usurped({:?})", self.tx_hash, by); - self.terminate = true; - return Some(TransactionStatus::Usurped(by)) - }, - }; - None + let status = Into::<TransactionStatus<_, _>>::into(&request); + status.is_final().then(|| self.terminate = true); + return Some(status); } /// Handles various transaction status updates from individual views and manages internal states @@ -401,34 +379,6 @@ where } } - /// Handles transaction invalidation sent via side channel. - /// - /// Function may set the context termination flag, which will close the stream. - /// - /// Returns true if the event should be sent out, and false if the invalidation request should - /// be skipped. - fn handle_invalidate_transaction(&mut self) -> bool { - let keys = self.known_views.clone(); - trace!( - target: LOG_TARGET, - tx_hash = ?self.tx_hash, - views = ?self.known_views.iter().collect::<Vec<_>>(), - "got invalidate_transaction" - ); - if self.views_keeping_tx_valid.is_disjoint(&keys) { - self.terminate = true; - true - } else { - //todo [#5477] - // - handle corner case: this may happen when tx is invalid for mempool, but somehow - // some view still sees it as ready/future. In that case we don't send the invalid - // event, as transaction can still be included. Probably we should set some flag here - // and allow for invalid sent from the view. - // - add debug / metrics, - false - } - } - /// Adds a new aggragted transaction status stream. /// /// Inserts a new view's transaction status stream into the stream map. The view is represented @@ -493,32 +443,31 @@ where Some((view_hash, (tx_hash, status))) = next_event(&mut aggregated_streams_map) => { events_metrics_collector.report_status(tx_hash, status.clone()); if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) { - log::trace!( + trace!( target: LOG_TARGET, - "[{:?}] aggregated_stream_map event: view:{} status:{:?}", - tx_hash, - view_hash, - status + ?tx_hash, + ?view_hash, + ?status, + "aggregated_stream_map event", ); - if let Err(e) = ctrl + if let Err(error) = ctrl .get_mut() .unbounded_send(ExternalWatcherCommand::ViewTransactionStatus(view_hash, status)) { - trace!(target: LOG_TARGET, "[{:?}] send status failed: {:?}", tx_hash, e); + trace!(target: LOG_TARGET, ?tx_hash, ?error, "send status failed"); ctrl.remove(); } } }, cmd = command_receiver.next() => { - log::trace!(target: LOG_TARGET, "cmd {:?}", cmd); match cmd { Some(ControllerCommand::AddViewStream(h,stream)) => { aggregated_streams_map.insert(h,stream); // //todo: aysnc and join all? external_watchers_tx_hash_map.write().retain(|tx_hash, ctrl| { ctrl.unbounded_send(ExternalWatcherCommand::AddView(h)) - .inspect_err(|e| { - trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e); + .inspect_err(|error| { + trace!(target: LOG_TARGET, ?tx_hash, ?error, "add_view: send message failed"); }) .is_ok() }) @@ -528,8 +477,8 @@ where //todo: aysnc and join all? external_watchers_tx_hash_map.write().retain(|tx_hash, ctrl| { ctrl.unbounded_send(ExternalWatcherCommand::RemoveView(h)) - .inspect_err(|e| { - trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e); + .inspect_err(|error| { + trace!(target: LOG_TARGET, ?tx_hash, ?error, "remove_view: send message failed"); }) .is_ok() }) @@ -539,11 +488,11 @@ where let tx_hash = request.hash(); events_metrics_collector.report_status(tx_hash, (&request).into()); if let Entry::Occupied(mut ctrl) = external_watchers_tx_hash_map.write().entry(tx_hash) { - if let Err(e) = ctrl + if let Err(error) = ctrl .get_mut() .unbounded_send(ExternalWatcherCommand::PoolTransactionStatus(request)) { - trace!(target: LOG_TARGET, "[{:?}] send message failed: {:?}", tx_hash, e); + trace!(target: LOG_TARGET, ?tx_hash, ?error, "send message failed"); ctrl.remove(); } } @@ -621,7 +570,7 @@ where Some( futures::stream::unfold(external_ctx, |mut ctx| async move { if ctx.terminate { - log::trace!(target: LOG_TARGET, "[{:?}] terminate", ctx.tx_hash); + trace!(target: LOG_TARGET, tx_hash = ?ctx.tx_hash, "terminate"); return None } loop { diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs index 0025d3e9f2d..2f3d31d0e6f 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs @@ -28,7 +28,7 @@ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound use sp_blockchain::HashAndNumber; use sp_runtime::traits::Block as BlockT; -use super::tx_mem_pool::TxMemPool; +use super::{tx_mem_pool::TxMemPool, view_store::ViewStore}; use futures::prelude::*; use tracing::{trace, warn}; @@ -45,7 +45,7 @@ where /// Communication channels with maintain thread are also provided. RevalidateView(Arc<View<Api>>, FinishRevalidationWorkerChannels<Api>), /// Request to revalidated the given instance of the [`TxMemPool`] at provided block hash. - RevalidateMempool(Arc<TxMemPool<Api, Block>>, HashAndNumber<Block>), + RevalidateMempool(Arc<TxMemPool<Api, Block>>, Arc<ViewStore<Api, Block>>, HashAndNumber<Block>), } /// The background revalidation worker. @@ -81,8 +81,11 @@ where match payload { WorkerPayload::RevalidateView(view, worker_channels) => view.revalidate(worker_channels).await, - WorkerPayload::RevalidateMempool(mempool, finalized_hash_and_number) => - mempool.revalidate(finalized_hash_and_number).await, + WorkerPayload::RevalidateMempool( + mempool, + view_store, + finalized_hash_and_number, + ) => mempool.revalidate(view_store, finalized_hash_and_number).await, }; } } @@ -164,6 +167,7 @@ where pub async fn revalidate_mempool( &self, mempool: Arc<TxMemPool<Api, Block>>, + view_store: Arc<ViewStore<Api, Block>>, finalized_hash: HashAndNumber<Block>, ) { trace!( @@ -173,9 +177,11 @@ where ); if let Some(ref to_worker) = self.background { - if let Err(error) = - to_worker.unbounded_send(WorkerPayload::RevalidateMempool(mempool, finalized_hash)) - { + if let Err(error) = to_worker.unbounded_send(WorkerPayload::RevalidateMempool( + mempool, + view_store, + finalized_hash, + )) { warn!( target: LOG_TARGET, ?error, @@ -183,7 +189,7 @@ where ); } } else { - mempool.revalidate(finalized_hash).await + mempool.revalidate(view_store, finalized_hash).await } } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index d64d80d4343..559f11da4cd 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -28,7 +28,7 @@ use std::{ cmp::Ordering, - collections::HashMap, + collections::{HashMap, HashSet}, sync::{ atomic::{self, AtomicU64}, Arc, @@ -56,8 +56,9 @@ use crate::{ }; use super::{ - metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener, - view_store::ViewStoreSubmitOutcome, + metrics::MetricsLink as PrometheusMetrics, + multi_view_listener::MultiViewListener, + view_store::{ViewStore, ViewStoreSubmitOutcome}, }; /// The minimum interval between single transaction revalidations. Given in blocks. @@ -467,13 +468,13 @@ where self.transactions.clone_map() } - /// Removes a transaction with given hash from the memory pool. - pub(super) fn remove_transaction( - &self, - tx_hash: &ExtrinsicHash<ChainApi>, - ) -> Option<Arc<TxInMemPool<ChainApi, Block>>> { - debug!(target: LOG_TARGET, ?tx_hash, "mempool::remove_transaction"); - self.transactions.write().remove(tx_hash) + /// Removes transactions with given hashes from the memory pool. + pub(super) fn remove_transactions(&self, tx_hashes: &[ExtrinsicHash<ChainApi>]) { + log_xt_trace!(target: LOG_TARGET, tx_hashes, "mempool::remove_transaction"); + let mut transactions = self.transactions.write(); + for tx_hash in tx_hashes { + transactions.remove(tx_hash); + } } /// Revalidates a batch of transactions against the provided finalized block. @@ -483,7 +484,7 @@ where trace!( target: LOG_TARGET, ?finalized_block, - "mempool::revalidate" + "mempool::revalidate_inner" ); let start = Instant::now(); @@ -531,7 +532,7 @@ where target: LOG_TARGET, ?tx_hash, ?validation_result, - "Purging: invalid" + "mempool::revalidate_inner invalid" ); Some(tx_hash) }, @@ -545,7 +546,7 @@ where count, invalid_hashes = invalid_hashes.len(), ?duration, - "mempool::revalidate" + "mempool::revalidate_inner" ); invalid_hashes @@ -570,23 +571,50 @@ where /// Revalidates transactions in the memory pool against a given finalized block and removes /// invalid ones. - pub(super) async fn revalidate(&self, finalized_block: HashAndNumber<Block>) { - trace!( - target: LOG_TARGET, - ?finalized_block, - "purge_transactions" - ); - let invalid_hashes = self.revalidate_inner(finalized_block.clone()).await; + pub(super) async fn revalidate( + &self, + view_store: Arc<ViewStore<ChainApi, Block>>, + finalized_block: HashAndNumber<Block>, + ) { + let revalidated_invalid_hashes = self.revalidate_inner(finalized_block.clone()).await; + + let mut invalid_hashes_subtrees = + revalidated_invalid_hashes.clone().into_iter().collect::<HashSet<_>>(); + for tx in &revalidated_invalid_hashes { + invalid_hashes_subtrees.extend( + view_store + .remove_transaction_subtree(*tx, |_, _| {}) + .into_iter() + .map(|tx| tx.hash), + ); + } + + { + let mut transactions = self.transactions.write(); + invalid_hashes_subtrees.iter().for_each(|tx_hash| { + transactions.remove(&tx_hash); + }); + }; self.metrics.report(|metrics| { - metrics.mempool_revalidation_invalid_txs.inc_by(invalid_hashes.len() as _) + metrics + .mempool_revalidation_invalid_txs + .inc_by(invalid_hashes_subtrees.len() as _) }); - let mut transactions = self.transactions.write(); - invalid_hashes.iter().for_each(|i| { - transactions.remove(i); - }); - self.listener.transactions_invalidated(&invalid_hashes); + let revalidated_invalid_hashes_len = revalidated_invalid_hashes.len(); + let invalid_hashes_subtrees_len = invalid_hashes_subtrees.len(); + + self.listener + .transactions_invalidated(&invalid_hashes_subtrees.into_iter().collect::<Vec<_>>()); + + trace!( + target: LOG_TARGET, + ?finalized_block, + revalidated_invalid_hashes_len, + invalid_hashes_subtrees_len, + "mempool::revalidate" + ); } /// Updates the priority of transaction stored in mempool using provided view_store submission diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs index 55544495612..4fa83ccc79b 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs @@ -28,10 +28,11 @@ use crate::{ common::tracing_log_xt::log_xt_trace, graph::{ self, base_pool::TimedTransactionSource, ExtrinsicFor, ExtrinsicHash, IsValidator, - ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor, + TransactionFor, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor, }, LOG_TARGET, }; +use indexmap::IndexMap; use parking_lot::Mutex; use sc_transaction_pool_api::{error::Error as TxPoolError, PoolStatus}; use sp_blockchain::HashAndNumber; @@ -39,11 +40,11 @@ use sp_runtime::{ generic::BlockId, traits::Block as BlockT, transaction_validity::TransactionValidityError, SaturatedConversion, }; -use std::{collections::HashMap, sync::Arc, time::Instant}; +use std::{sync::Arc, time::Instant}; use tracing::{debug, trace}; pub(super) struct RevalidationResult<ChainApi: graph::ChainApi> { - revalidated: HashMap<ExtrinsicHash<ChainApi>, ValidatedTransactionFor<ChainApi>>, + revalidated: IndexMap<ExtrinsicHash<ChainApi>, ValidatedTransactionFor<ChainApi>>, invalid_hashes: Vec<ExtrinsicHash<ChainApi>>, } @@ -274,7 +275,7 @@ where //todo: revalidate future, remove if invalid [#5496] let mut invalid_hashes = Vec::new(); - let mut revalidated = HashMap::new(); + let mut revalidated = IndexMap::new(); let mut validation_results = vec![]; let mut batch_iter = batch.into_iter(); @@ -317,7 +318,12 @@ where duration = ?revalidation_duration, "view::revalidate" ); - log_xt_trace!(data:tuple, target:LOG_TARGET, validation_results.iter().map(|x| (x.1, &x.0)), "view::revalidate result: {:?}"); + log_xt_trace!( + data:tuple, + target:LOG_TARGET, + validation_results.iter().map(|x| (x.1, &x.0)), + "view::revalidate result: {:?}" + ); for (validation_result, tx_hash, tx) in validation_results { match validation_result { Ok(Err(TransactionValidityError::Invalid(_))) => { @@ -494,12 +500,12 @@ where /// Refer to [`crate::graph::ValidatedPool::remove_subtree`] for more details. pub fn remove_subtree<F>( &self, - tx_hash: ExtrinsicHash<ChainApi>, + hashes: &[ExtrinsicHash<ChainApi>], listener_action: F, - ) -> Vec<ExtrinsicHash<ChainApi>> + ) -> Vec<TransactionFor<ChainApi>> where F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>), { - self.pool.validated_pool().remove_subtree(tx_hash, listener_action) + self.pool.validated_pool().remove_subtree(hashes, listener_action) } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index e534decf9b1..96cf9f71068 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -33,9 +33,13 @@ use crate::{ }; use itertools::Itertools; use parking_lot::RwLock; -use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus}; +use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus, TxInvalidityReportMap}; use sp_blockchain::TreeRoute; -use sp_runtime::{generic::BlockId, traits::Block as BlockT}; +use sp_runtime::{ + generic::BlockId, + traits::Block as BlockT, + transaction_validity::{InvalidTransaction, TransactionValidityError}, +}; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, sync::Arc, @@ -257,7 +261,7 @@ where target: LOG_TARGET, ?tx_hash, %error, - "submit_local: err" + "submit_local failed" ); Err(error) }, @@ -306,7 +310,7 @@ where target: LOG_TARGET, ?tx_hash, %error, - "submit_and_watch: err" + "submit_and_watch failed" ); return Err(error); }, @@ -669,6 +673,72 @@ where ); } + /// Reports invalid transactions to the view store. + /// + /// This function accepts an array of tuples, each containing a transaction hash and an + /// optional error encountered during the transaction execution at a specific (also optional) + /// block. + /// + /// Removal operation applies to provided transactions. Their descendants can be removed from + /// the view, but will not be invalidated or banned. + /// + /// Invalid future and stale transaction will be removed only from given `at` view, and will be + /// kept in the view_store. Such transaction will not be reported in returned vector. They + /// also will not be banned from re-entering the pool (however can be rejected from re-entring + /// the view). No event will be triggered. + /// + /// For other errors, the transaction will be removed from the view_store, and it will be + /// included in the returned vector. Additionally, transactions provided as input will be banned + /// from re-entering the pool. + /// + /// If the tuple's error is None, the transaction will be forcibly removed from the view_store, + /// banned and included into the returned vector. + /// + /// For every transaction removed from the view_store (excluding descendants) an Invalid event + /// is triggered. + /// + /// Returns the list of actually removed transactions from the mempool, which were included in + /// the provided input list. + pub(crate) fn report_invalid( + &self, + at: Option<Block::Hash>, + invalid_tx_errors: TxInvalidityReportMap<ExtrinsicHash<ChainApi>>, + ) -> Vec<TransactionFor<ChainApi>> { + let mut remove_from_view = vec![]; + let mut remove_from_pool = vec![]; + + invalid_tx_errors.into_iter().for_each(|(hash, e)| match e { + Some(TransactionValidityError::Invalid( + InvalidTransaction::Future | InvalidTransaction::Stale, + )) => { + remove_from_view.push(hash); + }, + _ => { + remove_from_pool.push(hash); + }, + }); + + // transaction removed from view, won't be included into the final result, as they may still + // be in the pool. + at.map(|at| { + self.get_view_at(at, true) + .map(|(view, _)| view.remove_subtree(&remove_from_view, |_, _| {})) + }); + + let mut removed = vec![]; + for tx_hash in &remove_from_pool { + let removed_from_pool = self.remove_transaction_subtree(*tx_hash, |_, _| {}); + removed_from_pool + .iter() + .find(|tx| tx.hash == *tx_hash) + .map(|tx| removed.push(tx.clone())); + } + + self.listener.transactions_invalidated(&remove_from_pool); + + removed + } + /// Replaces an existing transaction in the view_store with a new one. /// /// Attempts to replace a transaction identified by `replaced` with a new transaction `xt`. @@ -724,7 +794,7 @@ where )); }, PreInsertAction::RemoveSubtree(ref removal) => { - view.remove_subtree(removal.xt_hash, &*removal.listener_action); + view.remove_subtree(&[removal.xt_hash], &*removal.listener_action); }, } } @@ -804,7 +874,7 @@ where &self, xt_hash: ExtrinsicHash<ChainApi>, listener_action: F, - ) -> Vec<ExtrinsicHash<ChainApi>> + ) -> Vec<TransactionFor<ChainApi>> where F: Fn(&mut crate::graph::Listener<ChainApi>, ExtrinsicHash<ChainApi>) + Clone @@ -827,8 +897,8 @@ where .iter() .chain(self.inactive_views.read().iter()) .filter(|(_, view)| view.is_imported(&xt_hash)) - .flat_map(|(_, view)| view.remove_subtree(xt_hash, &listener_action)) - .filter(|xt_hash| seen.insert(*xt_hash)) + .flat_map(|(_, view)| view.remove_subtree(&[xt_hash], &listener_action)) + .filter_map(|xt| seen.insert(xt.hash).then(|| xt.clone())) .collect(); if let Some(removal_action) = self.pending_txs_tasks.write().get_mut(&xt_hash) { diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs index 0e70334ea0e..340b6d429ae 100644 --- a/substrate/client/transaction-pool/src/graph/listener.rs +++ b/substrate/client/transaction-pool/src/graph/listener.rs @@ -43,7 +43,8 @@ pub struct Listener<H: hash::Hash + Eq, C: ChainApi> { watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>, finality_watchers: LinkedHashMap<ExtrinsicHash<C>, Vec<H>>, - /// The sink used to notify dropped by enforcing limits or by being usurped transactions. + /// The sink used to notify dropped by enforcing limits or by being usurped, or invalid + /// transactions. /// /// Note: Ready and future statuses are alse communicated through this channel, enabling the /// stream consumer to track views that reference the transaction. @@ -195,6 +196,8 @@ impl<H: hash::Hash + traits::Member + Serialize + Clone, C: ChainApi> Listener<H pub fn invalid(&mut self, tx: &H) { trace!(target: LOG_TARGET, "[{:?}] Extrinsic invalid", tx); self.fire(tx, |watcher| watcher.invalid()); + + self.send_to_dropped_stream_sink(tx, TransactionStatus::Invalid); } /// Transaction was pruned from the pool. diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs index 52b12e3faba..d938e9bf06e 100644 --- a/substrate/client/transaction-pool/src/graph/pool.rs +++ b/substrate/client/transaction-pool/src/graph/pool.rs @@ -29,7 +29,6 @@ use sp_runtime::{ }, }; use std::{ - collections::HashMap, sync::Arc, time::{Duration, Instant}, }; @@ -248,7 +247,7 @@ impl<B: ChainApi> Pool<B> { /// Resubmit some transaction that were validated elsewhere. pub fn resubmit( &self, - revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, + revalidated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, ) { let now = Instant::now(); self.validated_pool.resubmit(revalidated_transactions); diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index 9631a27ead9..174b69da761 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -23,6 +23,7 @@ use std::{ use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET}; use futures::channel::mpsc::{channel, Sender}; +use indexmap::IndexMap; use parking_lot::{Mutex, RwLock}; use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority}; use sp_blockchain::HashAndNumber; @@ -411,7 +412,7 @@ impl<B: ChainApi> ValidatedPool<B> { /// Transactions that are missing from the pool are not submitted. pub fn resubmit( &self, - mut updated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, + mut updated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, ) { #[derive(Debug, Clone, Copy, PartialEq)] enum Status { @@ -446,7 +447,7 @@ impl<B: ChainApi> ValidatedPool<B> { let removed = pool.remove_subtree(&[hash]); for removed_tx in removed { let removed_hash = removed_tx.hash; - let updated_transaction = updated_transactions.remove(&removed_hash); + let updated_transaction = updated_transactions.shift_remove(&removed_hash); let tx_to_resubmit = if let Some(updated_tx) = updated_transaction { updated_tx } else { @@ -463,7 +464,7 @@ impl<B: ChainApi> ValidatedPool<B> { txs_to_resubmit.push((removed_hash, tx_to_resubmit)); } // make sure to remove the hash even if it's not present in the pool anymore. - updated_transactions.remove(&hash); + updated_transactions.shift_remove(&hash); } // if we're rejecting future transactions, then insertion order matters here: @@ -692,27 +693,24 @@ impl<B: ChainApi> ValidatedPool<B> { /// to prevent them from entering the pool right away. /// Note this is not the case for the dependent transactions - those may /// still be valid so we want to be able to re-import them. + /// + /// For every removed transaction an Invalid event is triggered. + /// + /// Returns the list of actually removed transactions, which may include transactions dependent + /// on provided set. pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> { // early exit in case there is no invalid transactions. if hashes.is_empty() { return vec![] } - log::trace!(target: LOG_TARGET, "Removing invalid transactions: {:?}", hashes.len()); - - // temporarily ban invalid transactions - self.rotator.ban(&Instant::now(), hashes.iter().cloned()); - - let invalid = self.pool.write().remove_subtree(hashes); + let invalid = self.remove_subtree(hashes, |listener, removed_tx_hash| { + listener.invalid(&removed_tx_hash); + }); - log::trace!(target: LOG_TARGET, "Removed invalid transactions: {:?}", invalid.len()); + log::trace!(target: LOG_TARGET, "Removed invalid transactions: {:?}/{:?}", hashes.len(), invalid.len()); log_xt_trace!(target: LOG_TARGET, invalid.iter().map(|t| t.hash), "Removed invalid transaction"); - let mut listener = self.listener.write(); - for tx in &invalid { - listener.invalid(&tx.hash); - } - invalid } @@ -781,6 +779,9 @@ impl<B: ChainApi> ValidatedPool<B> { /// This function traverses the dependency graph of transactions and removes the specified /// transaction along with all its descendant transactions from the pool. /// + /// The root transaction will be banned from re-entrering the pool. Descendant transactions may + /// be re-submitted to the pool if required. + /// /// A `listener_action` callback function is invoked for every transaction that is removed, /// providing a reference to the pool's listener and the hash of the removed transaction. This /// allows to trigger the required events. @@ -789,21 +790,23 @@ impl<B: ChainApi> ValidatedPool<B> { /// transaction specified by `tx_hash`. pub fn remove_subtree<F>( &self, - tx_hash: ExtrinsicHash<B>, + hashes: &[ExtrinsicHash<B>], listener_action: F, - ) -> Vec<ExtrinsicHash<B>> + ) -> Vec<TransactionFor<B>> where F: Fn(&mut Listener<B>, ExtrinsicHash<B>), { - self.pool - .write() - .remove_subtree(&[tx_hash]) + // temporarily ban invalid transactions + self.rotator.ban(&Instant::now(), hashes.iter().cloned()); + let removed = self.pool.write().remove_subtree(hashes); + + removed .into_iter() .map(|tx| { let removed_tx_hash = tx.hash; let mut listener = self.listener.write(); listener_action(&mut *listener, removed_tx_hash); - removed_tx_hash + tx.clone() }) .collect::<Vec<_>>() } diff --git a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs index 2a691ae35ea..ffcade08591 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs @@ -18,17 +18,17 @@ //! Pool periodic revalidation. -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - pin::Pin, - sync::Arc, -}; - use crate::graph::{BlockHash, ChainApi, ExtrinsicHash, Pool, ValidatedTransaction}; +use indexmap::IndexMap; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::{ generic::BlockId, traits::SaturatedConversion, transaction_validity::TransactionValidityError, }; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + pin::Pin, + sync::Arc, +}; use futures::prelude::*; use std::time::Duration; @@ -84,7 +84,7 @@ async fn batch_revalidate<Api: ChainApi>( }; let mut invalid_hashes = Vec::new(); - let mut revalidated = HashMap::new(); + let mut revalidated = IndexMap::new(); let validation_results = futures::future::join_all(batch.into_iter().filter_map(|ext_hash| { pool.validated_pool().ready_by_hash(&ext_hash).map(|ext| { diff --git a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs index 3598f9dbc2a..9f4d63f3ba3 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs @@ -39,13 +39,16 @@ use prometheus_endpoint::Registry as PrometheusRegistry; use sc_transaction_pool_api::{ error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, - TxHash, + TxHash, TxInvalidityReportMap, }; use sp_blockchain::{HashAndNumber, TreeRoute}; use sp_core::traits::SpawnEssentialNamed; use sp_runtime::{ generic::BlockId, - traits::{AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, Zero}, + traits::{ + AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, SaturatedConversion, Zero, + }, + transaction_validity::TransactionValidityError, }; use std::{ collections::{HashMap, HashSet}, @@ -323,8 +326,13 @@ where .map(|mut outcome| outcome.expect_watcher().into_stream().boxed()) } - fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> { - let removed = self.pool.validated_pool().remove_invalid(hashes); + fn report_invalid( + &self, + _at: Option<<Self::Block as BlockT>::Hash>, + invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>, + ) -> Vec<Arc<Self::InPoolTransaction>> { + let hashes = invalid_tx_errors.keys().map(|h| *h).collect::<Vec<_>>(); + let removed = self.pool.validated_pool().remove_invalid(&hashes); self.metrics .report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64)); removed @@ -459,10 +467,6 @@ where at: Block::Hash, xt: sc_transaction_pool_api::LocalTransactionFor<Self>, ) -> Result<Self::Hash, Self::Error> { - use sp_runtime::{ - traits::SaturatedConversion, transaction_validity::TransactionValidityError, - }; - let validity = self .api .validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))? diff --git a/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs b/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs index e373c0278d8..b86fde73fda 100644 --- a/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs +++ b/substrate/client/transaction-pool/src/transaction_pool_wrapper.rs @@ -28,7 +28,7 @@ use async_trait::async_trait; use sc_transaction_pool_api::{ ChainEvent, ImportNotificationStream, LocalTransactionFor, LocalTransactionPool, MaintainedTransactionPool, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool, - TransactionSource, TransactionStatusStreamFor, TxHash, + TransactionSource, TransactionStatusStreamFor, TxHash, TxInvalidityReportMap, }; use sp_runtime::traits::Block as BlockT; use std::{collections::HashMap, pin::Pin, sync::Arc}; @@ -107,8 +107,12 @@ where self.0.ready() } - fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> { - self.0.remove_invalid(hashes) + fn report_invalid( + &self, + at: Option<<Self::Block as BlockT>::Hash>, + invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>, + ) -> Vec<Arc<Self::InPoolTransaction>> { + self.0.report_invalid(at, invalid_tx_errors) } fn futures(&self) -> Vec<Self::InPoolTransaction> { diff --git a/substrate/client/transaction-pool/tests/fatp.rs b/substrate/client/transaction-pool/tests/fatp.rs index dd82c52a604..a4a932dd853 100644 --- a/substrate/client/transaction-pool/tests/fatp.rs +++ b/substrate/client/transaction-pool/tests/fatp.rs @@ -25,8 +25,8 @@ use fatp_common::{ use futures::{executor::block_on, task::Poll, FutureExt, StreamExt}; use sc_transaction_pool::ChainApi; use sc_transaction_pool_api::{ - error::{Error as TxPoolError, IntoPoolError}, - ChainEvent, MaintainedTransactionPool, TransactionPool, TransactionStatus, + error::Error as TxPoolError, ChainEvent, MaintainedTransactionPool, TransactionPool, + TransactionStatus, }; use sp_runtime::transaction_validity::InvalidTransaction; use std::{sync::Arc, time::Duration}; @@ -788,11 +788,12 @@ fn fatp_linear_progress_finalization() { let f00 = forks[0][0].hash(); let f12 = forks[1][2].hash(); let f14 = forks[1][4].hash(); + let f15 = forks[1][5].hash(); let event = new_best_block_event(&pool, None, f00); block_on(pool.maintain(event)); - let xt0 = uxt(Bob, 204); + let xt0 = uxt(Bob, 205); let submissions = vec![pool.submit_one(invalid_hash(), SOURCE, xt0.clone())]; block_on(futures::future::join_all(submissions)); @@ -803,13 +804,13 @@ fn fatp_linear_progress_finalization() { log::debug!(target:LOG_TARGET, "stats: {:#?}", pool.status_all()); - let event = ChainEvent::Finalized { hash: f14, tree_route: Arc::from(vec![]) }; - block_on(pool.maintain(event)); + block_on(pool.maintain(new_best_block_event(&pool, Some(f12), f15))); + block_on(pool.maintain(finalized_block_event(&pool, f00, f14))); log::debug!(target:LOG_TARGET, "stats: {:#?}", pool.status_all()); assert_eq!(pool.active_views_count(), 1); - assert_pool_status!(f14, &pool, 1, 0); + assert_pool_status!(f15, &pool, 1, 0); } #[test] @@ -854,112 +855,6 @@ fn fatp_fork_finalization_removes_stale_views() { assert_eq!(pool.active_views_count(), 1); } -#[test] -fn fatp_watcher_invalid_fails_on_submission() { - sp_tracing::try_init_simple(); - - let (pool, api, _) = pool(); - - let header01 = api.push_block(1, vec![], true); - - let event = new_best_block_event(&pool, None, header01.hash()); - block_on(pool.maintain(event)); - - let xt0 = uxt(Alice, 150); - api.add_invalid(&xt0); - let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())); - let xt0_watcher = xt0_watcher.map(|_| ()); - - assert_pool_status!(header01.hash(), &pool, 0, 0); - assert!(matches!( - xt0_watcher.unwrap_err().into_pool_error(), - Ok(TxPoolError::InvalidTransaction(InvalidTransaction::Stale)) - )); -} - -#[test] -fn fatp_watcher_invalid_single_revalidation() { - sp_tracing::try_init_simple(); - - let (pool, api, _) = pool(); - - let header01 = api.push_block(1, vec![], true); - let event = new_best_block_event(&pool, Some(api.genesis_hash()), header01.hash()); - block_on(pool.maintain(event)); - - let xt0 = uxt(Alice, 200); - let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); - - api.add_invalid(&xt0); - - let header02 = api.push_block_with_parent(header01.hash(), vec![], true); - let event = finalized_block_event(&pool, header01.hash(), header02.hash()); - block_on(pool.maintain(event)); - - // wait 10 blocks for revalidation - let mut prev_header = header02; - for n in 3..=11 { - let header = api.push_block(n, vec![], true); - let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); - block_on(pool.maintain(event)); - prev_header = header; - } - - let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>(); - log::debug!("xt0_events: {:#?}", xt0_events); - assert_eq!(xt0_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid]); -} - -#[test] -fn fatp_watcher_invalid_single_revalidation2() { - sp_tracing::try_init_simple(); - - let (pool, api, _) = pool(); - - let xt0 = uxt(Alice, 200); - let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); - assert_eq!(pool.mempool_len(), (0, 1)); - api.add_invalid(&xt0); - - let header01 = api.push_block(1, vec![], true); - let event = new_best_block_event(&pool, None, header01.hash()); - block_on(pool.maintain(event)); - - let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>(); - log::debug!("xt0_events: {:#?}", xt0_events); - assert_eq!(xt0_events, vec![TransactionStatus::Invalid]); - assert_eq!(pool.mempool_len(), (0, 0)); -} - -#[test] -fn fatp_watcher_invalid_single_revalidation3() { - sp_tracing::try_init_simple(); - - let (pool, api, _) = pool(); - - let xt0 = uxt(Alice, 150); - let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); - assert_eq!(pool.mempool_len(), (0, 1)); - - let header01 = api.push_block(1, vec![], true); - let event = finalized_block_event(&pool, api.genesis_hash(), header01.hash()); - block_on(pool.maintain(event)); - - // wait 10 blocks for revalidation - let mut prev_header = header01; - for n in 2..=11 { - let header = api.push_block(n, vec![], true); - let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); - block_on(pool.maintain(event)); - prev_header = header; - } - - let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>(); - log::debug!("xt0_events: {:#?}", xt0_events); - assert_eq!(xt0_events, vec![TransactionStatus::Invalid]); - assert_eq!(pool.mempool_len(), (0, 0)); -} - #[test] fn fatp_watcher_future() { sp_tracing::try_init_simple(); @@ -976,14 +871,12 @@ fn fatp_watcher_future() { assert_pool_status!(header01.hash(), &pool, 0, 1); - let header02 = api.push_block(2, vec![], true); - let event = ChainEvent::Finalized { - hash: header02.hash(), - tree_route: Arc::from(vec![header01.hash()]), - }; - block_on(pool.maintain(event)); + let header02 = api.push_block_with_parent(header01.hash(), vec![], true); + let header03 = api.push_block_with_parent(header02.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header03.hash()))); + block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash()))); - assert_pool_status!(header02.hash(), &pool, 0, 1); + assert_pool_status!(header03.hash(), &pool, 0, 1); let xt0_events = block_on(xt0_watcher.take(1).collect::<Vec<_>>()); assert_eq!(xt0_events, vec![TransactionStatus::Future]); @@ -1107,15 +1000,12 @@ fn fatp_watcher_future_and_finalized() { assert_pool_status!(header01.hash(), &pool, 1, 1); - let header02 = api.push_block(2, vec![xt0], true); - let event = ChainEvent::Finalized { - hash: header02.hash(), - tree_route: Arc::from(vec![header01.hash()]), - }; - // let event = new_best_block_event(&pool, Some(header01.hash()), header02.hash()); - block_on(pool.maintain(event)); + let header02 = api.push_block_with_parent(header01.hash(), vec![xt0], true); + let header03 = api.push_block_with_parent(header02.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header03.hash()))); + block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash()))); - assert_pool_status!(header02.hash(), &pool, 0, 1); + assert_pool_status!(header03.hash(), &pool, 0, 1); let xt1_status = block_on(xt1_watcher.take(1).collect::<Vec<_>>()); assert_eq!(xt1_status, vec![TransactionStatus::Future]); @@ -1835,180 +1725,6 @@ fn fatp_watcher_best_block_after_finalization_does_not_retract() { ); } -#[test] -fn fatp_watcher_invalid_many_revalidation() { - sp_tracing::try_init_simple(); - - let (pool, api, _) = pool(); - - let header01 = api.push_block(1, vec![], true); - block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); - - let xt0 = uxt(Alice, 200); - let xt1 = uxt(Alice, 201); - let xt2 = uxt(Alice, 202); - let xt3 = uxt(Alice, 203); - let xt4 = uxt(Alice, 204); - - let submissions = vec![ - pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone()), - pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone()), - ]; - - let submissions = block_on(futures::future::join_all(submissions)); - assert_eq!(pool.status_all()[&header01.hash()].ready, 5); - - let mut watchers = submissions.into_iter().map(Result::unwrap).collect::<Vec<_>>(); - let xt4_watcher = watchers.remove(4); - let xt3_watcher = watchers.remove(3); - let xt2_watcher = watchers.remove(2); - let xt1_watcher = watchers.remove(1); - let xt0_watcher = watchers.remove(0); - - api.add_invalid(&xt3); - api.add_invalid(&xt4); - - let header02 = api.push_block(2, vec![], true); - block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash()))); - - //todo: shall revalidation check finalized (fork's tip) view? - assert_eq!(pool.status_all()[&header02.hash()].ready, 5); - - let header03 = api.push_block(3, vec![xt0.clone(), xt1.clone(), xt2.clone()], true); - block_on(pool.maintain(finalized_block_event(&pool, header02.hash(), header03.hash()))); - - // wait 10 blocks for revalidation - let mut prev_header = header03.clone(); - for n in 4..=11 { - let header = api.push_block(n, vec![], true); - let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); - block_on(pool.maintain(event)); - prev_header = header; - } - - let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>(); - let xt1_events = futures::executor::block_on_stream(xt1_watcher).collect::<Vec<_>>(); - let xt2_events = futures::executor::block_on_stream(xt2_watcher).collect::<Vec<_>>(); - let xt3_events = futures::executor::block_on_stream(xt3_watcher).collect::<Vec<_>>(); - let xt4_events = futures::executor::block_on_stream(xt4_watcher).collect::<Vec<_>>(); - - log::debug!("xt0_events: {:#?}", xt0_events); - log::debug!("xt1_events: {:#?}", xt1_events); - log::debug!("xt2_events: {:#?}", xt2_events); - log::debug!("xt3_events: {:#?}", xt3_events); - log::debug!("xt4_events: {:#?}", xt4_events); - - assert_eq!( - xt0_events, - vec![ - TransactionStatus::Ready, - TransactionStatus::InBlock((header03.hash(), 0)), - TransactionStatus::Finalized((header03.hash(), 0)) - ], - ); - assert_eq!( - xt1_events, - vec![ - TransactionStatus::Ready, - TransactionStatus::InBlock((header03.hash(), 1)), - TransactionStatus::Finalized((header03.hash(), 1)) - ], - ); - assert_eq!( - xt2_events, - vec![ - TransactionStatus::Ready, - TransactionStatus::InBlock((header03.hash(), 2)), - TransactionStatus::Finalized((header03.hash(), 2)) - ], - ); - assert_eq!(xt3_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid],); - assert_eq!(xt4_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid],); -} - -#[test] -fn should_not_retain_invalid_hashes_from_retracted() { - sp_tracing::try_init_simple(); - - let (pool, api, _) = pool(); - let xt = uxt(Alice, 200); - - let header01 = api.push_block(1, vec![], true); - block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); - let watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt.clone())).unwrap(); - - let header02a = api.push_block_with_parent(header01.hash(), vec![xt.clone()], true); - - block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02a.hash()))); - assert_eq!(pool.status_all()[&header02a.hash()].ready, 0); - - api.add_invalid(&xt); - let header02b = api.push_block_with_parent(header01.hash(), vec![], true); - block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02b.hash()))); - - // wait 10 blocks for revalidation - let mut prev_header = header02b.clone(); - for _ in 3..=11 { - let header = api.push_block_with_parent(prev_header.hash(), vec![], true); - let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); - block_on(pool.maintain(event)); - prev_header = header; - } - - assert_eq!( - futures::executor::block_on_stream(watcher).collect::<Vec<_>>(), - vec![ - TransactionStatus::Ready, - TransactionStatus::InBlock((header02a.hash(), 0)), - TransactionStatus::Invalid - ], - ); - - //todo: shall revalidation check finalized (fork's tip) view? - assert_eq!(pool.status_all()[&prev_header.hash()].ready, 0); -} - -#[test] -fn should_revalidate_during_maintenance() { - sp_tracing::try_init_simple(); - - let (pool, api, _) = pool(); - let xt1 = uxt(Alice, 200); - let xt2 = uxt(Alice, 201); - - let header01 = api.push_block(1, vec![], true); - block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); - - block_on(pool.submit_one(invalid_hash(), SOURCE, xt1.clone())).unwrap(); - let watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); - assert_eq!(pool.status_all()[&header01.hash()].ready, 2); - assert_eq!(api.validation_requests().len(), 2); - - let header02 = api.push_block(2, vec![xt1.clone()], true); - api.add_invalid(&xt2); - block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash()))); - - //todo: shall revalidation check finalized (fork's tip) view? - assert_eq!(pool.status_all()[&header02.hash()].ready, 1); - - // wait 10 blocks for revalidation - let mut prev_header = header02.clone(); - for _ in 3..=11 { - let header = api.push_block_with_parent(prev_header.hash(), vec![], true); - let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); - block_on(pool.maintain(event)); - prev_header = header; - } - - assert_eq!( - futures::executor::block_on_stream(watcher).collect::<Vec<_>>(), - vec![TransactionStatus::Ready, TransactionStatus::Invalid], - ); -} - #[test] fn fatp_transactions_purging_stale_on_finalization_works() { sp_tracing::try_init_simple(); @@ -2057,53 +1773,6 @@ fn fatp_transactions_purging_stale_on_finalization_works() { ); } -#[test] -fn fatp_transactions_purging_invalid_on_finalization_works() { - sp_tracing::try_init_simple(); - - let (pool, api, _) = pool(); - - let xt1 = uxt(Alice, 200); - let xt2 = uxt(Alice, 201); - let xt3 = uxt(Alice, 202); - - let header01 = api.push_block(1, vec![], true); - block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); - - let watcher1 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); - let watcher2 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); - block_on(pool.submit_one(invalid_hash(), SOURCE, xt3.clone())).unwrap(); - - assert_eq!(api.validation_requests().len(), 3); - assert_eq!(pool.status_all()[&header01.hash()].ready, 3); - assert_eq!(pool.mempool_len(), (1, 2)); - - let header02 = api.push_block(2, vec![], true); - api.add_invalid(&xt1); - api.add_invalid(&xt2); - api.add_invalid(&xt3); - block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash()))); - - // wait 10 blocks for revalidation - let mut prev_header = header02; - for n in 3..=13 { - let header = api.push_block(n, vec![], true); - let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); - block_on(pool.maintain(event)); - prev_header = header; - } - - //todo: should it work at all? (it requires better revalidation: mempool keeping validated txs) - //additionally it also requires revalidation of finalized view. - // assert_eq!(pool.status_all()[&header02.hash()].ready, 0); - assert_eq!(pool.mempool_len(), (0, 0)); - - let xt1_events = futures::executor::block_on_stream(watcher1).collect::<Vec<_>>(); - let xt2_events = futures::executor::block_on_stream(watcher2).collect::<Vec<_>>(); - assert_eq!(xt1_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid]); - assert_eq!(xt2_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid]); -} - #[test] fn import_sink_works() { sp_tracing::try_init_simple(); diff --git a/substrate/client/transaction-pool/tests/fatp_invalid.rs b/substrate/client/transaction-pool/tests/fatp_invalid.rs new file mode 100644 index 00000000000..0076d2a7cbb --- /dev/null +++ b/substrate/client/transaction-pool/tests/fatp_invalid.rs @@ -0,0 +1,690 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Tests of invalid transactions handling for fork-aware transaction pool. + +pub mod fatp_common; + +use fatp_common::{ + finalized_block_event, invalid_hash, new_best_block_event, pool, TestPoolBuilder, LOG_TARGET, + SOURCE, +}; +use futures::{executor::block_on, FutureExt}; +use sc_transaction_pool::ChainApi; +use sc_transaction_pool_api::{ + error::{Error as TxPoolError, IntoPoolError}, + MaintainedTransactionPool, TransactionPool, TransactionStatus, +}; +use sp_runtime::transaction_validity::{InvalidTransaction, TransactionValidityError}; +use substrate_test_runtime_client::Sr25519Keyring::*; +use substrate_test_runtime_transaction_pool::uxt; + +#[test] +fn fatp_invalid_three_views_stale_gets_rejected() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Alice, 200); + + let header02a = api.push_block_with_parent(header01.hash(), vec![], true); + let header02b = api.push_block_with_parent(header01.hash(), vec![], true); + let header02c = api.push_block_with_parent(header01.hash(), vec![], true); + api.set_nonce(header02a.hash(), Alice.into(), 201); + api.set_nonce(header02b.hash(), Alice.into(), 201); + api.set_nonce(header02c.hash(), Alice.into(), 201); + + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02a.hash()))); + block_on(pool.maintain(new_best_block_event(&pool, Some(header02a.hash()), header02b.hash()))); + block_on(pool.maintain(new_best_block_event(&pool, Some(header02b.hash()), header02c.hash()))); + + let result0 = block_on(pool.submit_one(invalid_hash(), SOURCE, xt0.clone())); + let result1 = block_on(pool.submit_one(invalid_hash(), SOURCE, xt1.clone())); + + assert!(matches!( + result0.as_ref().unwrap_err().0, + TxPoolError::InvalidTransaction(InvalidTransaction::Stale) + )); + assert!(matches!( + result1.as_ref().unwrap_err().0, + TxPoolError::InvalidTransaction(InvalidTransaction::Stale) + )); +} + +#[test] +fn fatp_invalid_three_views_invalid_gets_rejected() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Alice, 200); + let header02a = api.push_block_with_parent(header01.hash(), vec![], true); + let header02b = api.push_block_with_parent(header01.hash(), vec![], true); + let header02c = api.push_block_with_parent(header01.hash(), vec![], true); + + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02a.hash()))); + block_on(pool.maintain(new_best_block_event(&pool, Some(header02a.hash()), header02b.hash()))); + block_on(pool.maintain(new_best_block_event(&pool, Some(header02b.hash()), header02c.hash()))); + + api.add_invalid(&xt0); + api.add_invalid(&xt1); + + let result0 = block_on(pool.submit_one(invalid_hash(), SOURCE, xt0.clone())); + let result1 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).map(|_| ()); + + assert!(matches!( + result0.as_ref().unwrap_err().0, + TxPoolError::InvalidTransaction(InvalidTransaction::Custom(_)) + )); + assert!(matches!( + result1.as_ref().unwrap_err().0, + TxPoolError::InvalidTransaction(InvalidTransaction::Custom(_)) + )); +} + +#[test] +fn fatp_transactions_purging_invalid_on_finalization_works() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + + let xt1 = uxt(Alice, 200); + let xt2 = uxt(Alice, 201); + let xt3 = uxt(Alice, 202); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + + let watcher1 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let watcher2 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + let watcher3 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + + assert_eq!(api.validation_requests().len(), 3); + assert_eq!(pool.status_all()[&header01.hash()].ready, 3); + assert_eq!(pool.mempool_len(), (0, 3)); + + let header02 = api.push_block(2, vec![], true); + api.add_invalid(&xt1); + api.add_invalid(&xt2); + api.add_invalid(&xt3); + block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash()))); + + // wait 10 blocks for revalidation + let mut prev_header = header02.clone(); + for n in 3..=11 { + let header = api.push_block(n, vec![], true); + let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); + block_on(pool.maintain(event)); + prev_header = header; + } + + assert_eq!(pool.mempool_len(), (0, 0)); + + assert_watcher_stream!(watcher1, [TransactionStatus::Ready, TransactionStatus::Invalid]); + assert_watcher_stream!(watcher2, [TransactionStatus::Ready, TransactionStatus::Invalid]); + assert_watcher_stream!(watcher3, [TransactionStatus::Ready, TransactionStatus::Invalid]); +} + +#[test] +fn fatp_transactions_purging_invalid_on_finalization_works2() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + let xt1 = uxt(Alice, 200); + let xt2 = uxt(Alice, 201); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + block_on(pool.submit_one(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + + let watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + assert_eq!(pool.status_all()[&header01.hash()].ready, 2); + assert_eq!(api.validation_requests().len(), 2); + + let header02 = api.push_block(2, vec![xt1.clone()], true); + api.add_invalid(&xt2); + block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02.hash()))); + + assert_eq!(pool.status_all()[&header02.hash()].ready, 1); + + // wait 10 blocks for revalidation + let mut prev_header = header02.clone(); + for _ in 3..=11 { + let header = api.push_block_with_parent(prev_header.hash(), vec![], true); + let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); + block_on(pool.maintain(event)); + prev_header = header; + } + + assert_watcher_stream!(watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]); + assert_eq!(pool.status_all()[&prev_header.hash()].ready, 0); +} + +#[test] +fn should_not_retain_invalid_hashes_from_retracted() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + let xt = uxt(Alice, 200); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + let watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt.clone())).unwrap(); + + let header02a = api.push_block_with_parent(header01.hash(), vec![xt.clone()], true); + + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02a.hash()))); + assert_eq!(pool.status_all()[&header02a.hash()].ready, 0); + + api.add_invalid(&xt); + let header02b = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header02b.hash()))); + + // wait 10 blocks for revalidation + let mut prev_header = header02b.clone(); + for _ in 3..=11 { + let header = api.push_block_with_parent(prev_header.hash(), vec![], true); + let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); + block_on(pool.maintain(event)); + prev_header = header; + } + + assert_watcher_stream!( + watcher, + [ + TransactionStatus::Ready, + TransactionStatus::InBlock((header02a.hash(), 0)), + TransactionStatus::Invalid + ] + ); + + //todo: shall revalidation check finalized (fork's tip) view? + assert_eq!(pool.status_all()[&prev_header.hash()].ready, 0); +} + +#[test] +fn fatp_watcher_invalid_many_revalidation() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Alice, 201); + let xt2 = uxt(Alice, 202); + let xt3 = uxt(Alice, 203); + let xt4 = uxt(Alice, 204); + + let submissions = vec![ + pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone()), + pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone()), + pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone()), + pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone()), + pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone()), + ]; + + let submissions = block_on(futures::future::join_all(submissions)); + assert_eq!(pool.status_all()[&header01.hash()].ready, 5); + + let mut watchers = submissions.into_iter().map(Result::unwrap).collect::<Vec<_>>(); + let xt4_watcher = watchers.remove(4); + let xt3_watcher = watchers.remove(3); + let xt2_watcher = watchers.remove(2); + let xt1_watcher = watchers.remove(1); + let xt0_watcher = watchers.remove(0); + + api.add_invalid(&xt3); + api.add_invalid(&xt4); + + let header02 = api.push_block(2, vec![], true); + block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash()))); + + //todo: shall revalidation check finalized (fork's tip) view? + assert_eq!(pool.status_all()[&header02.hash()].ready, 5); + + let header03 = api.push_block(3, vec![xt0.clone(), xt1.clone(), xt2.clone()], true); + block_on(pool.maintain(finalized_block_event(&pool, header02.hash(), header03.hash()))); + + // wait 10 blocks for revalidation + let mut prev_header = header03.clone(); + for n in 4..=11 { + let header = api.push_block(n, vec![], true); + let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); + block_on(pool.maintain(event)); + prev_header = header; + } + + assert_watcher_stream!( + xt0_watcher, + [ + TransactionStatus::Ready, + TransactionStatus::InBlock((header03.hash(), 0)), + TransactionStatus::Finalized((header03.hash(), 0)) + ] + ); + assert_watcher_stream!( + xt1_watcher, + [ + TransactionStatus::Ready, + TransactionStatus::InBlock((header03.hash(), 1)), + TransactionStatus::Finalized((header03.hash(), 1)) + ] + ); + assert_watcher_stream!( + xt2_watcher, + [ + TransactionStatus::Ready, + TransactionStatus::InBlock((header03.hash(), 2)), + TransactionStatus::Finalized((header03.hash(), 2)) + ] + ); + assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]); + assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]); +} + +#[test] +fn fatp_watcher_invalid_fails_on_submission() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + + let header01 = api.push_block(1, vec![], true); + + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Alice, 150); + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())); + let xt0_watcher = xt0_watcher.map(|_| ()); + + assert_pool_status!(header01.hash(), &pool, 0, 0); + // Alice's nonce in state is 200, tx is 150. + assert!(matches!( + xt0_watcher.unwrap_err().into_pool_error(), + Ok(TxPoolError::InvalidTransaction(InvalidTransaction::Stale)) + )); +} + +#[test] +fn fatp_watcher_invalid_single_revalidation() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, Some(api.genesis_hash()), header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Alice, 200); + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + + api.add_invalid(&xt0); + + let header02 = api.push_block_with_parent(header01.hash(), vec![], true); + let event = finalized_block_event(&pool, header01.hash(), header02.hash()); + block_on(pool.maintain(event)); + + // wait 10 blocks for revalidation + let mut prev_header = header02; + for n in 3..=11 { + let header = api.push_block(n, vec![], true); + let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); + block_on(pool.maintain(event)); + prev_header = header; + } + + let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>(); + log::debug!("xt0_events: {:#?}", xt0_events); + assert_eq!(xt0_events, vec![TransactionStatus::Ready, TransactionStatus::Invalid]); +} + +#[test] +fn fatp_watcher_invalid_single_revalidation2() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + + let xt0 = uxt(Alice, 200); + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + assert_eq!(pool.mempool_len(), (0, 1)); + api.add_invalid(&xt0); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>(); + log::debug!("xt0_events: {:#?}", xt0_events); + assert_eq!(xt0_events, vec![TransactionStatus::Invalid]); + assert_eq!(pool.mempool_len(), (0, 0)); +} + +#[test] +fn fatp_watcher_invalid_single_revalidation3() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + + let xt0 = uxt(Alice, 150); + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + assert_eq!(pool.mempool_len(), (0, 1)); + + let header01 = api.push_block(1, vec![], true); + let event = finalized_block_event(&pool, api.genesis_hash(), header01.hash()); + block_on(pool.maintain(event)); + + // wait 10 blocks for revalidation + let mut prev_header = header01; + for n in 2..=11 { + let header = api.push_block(n, vec![], true); + let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); + block_on(pool.maintain(event)); + prev_header = header; + } + + let xt0_events = futures::executor::block_on_stream(xt0_watcher).collect::<Vec<_>>(); + log::debug!("xt0_events: {:#?}", xt0_events); + assert_eq!(xt0_events, vec![TransactionStatus::Invalid]); + assert_eq!(pool.mempool_len(), (0, 0)); +} + +#[test] +fn fatp_invalid_report_stale_or_future_works_as_expected() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = TestPoolBuilder::new().build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + api.set_nonce(api.genesis_hash(), Dave.into(), 500); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Bob, 300); + let xt2 = uxt(Charlie, 400); + let xt3 = uxt(Dave, 500); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + + assert_pool_status!(header01.hash(), &pool, 4, 0); + assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]); + + // future/stale are ignored when at is None + let xt0_report = ( + pool.api().hash_and_length(&xt0).0, + Some(TransactionValidityError::Invalid(InvalidTransaction::Future)), + ); + let invalid_txs = [xt0_report].into(); + let result = pool.report_invalid(None, invalid_txs); + assert!(result.is_empty()); + assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]); + + // future/stale are applied when at is provided + let xt0_report = ( + pool.api().hash_and_length(&xt0).0, + Some(TransactionValidityError::Invalid(InvalidTransaction::Future)), + ); + let xt1_report = ( + pool.api().hash_and_length(&xt1).0, + Some(TransactionValidityError::Invalid(InvalidTransaction::Stale)), + ); + let invalid_txs = [xt0_report, xt1_report].into(); + let result = pool.report_invalid(Some(header01.hash()), invalid_txs); + // stale/future does not cause tx to be removed from the pool + assert!(result.is_empty()); + // assert_eq!(result[0].hash, pool.api().hash_and_length(&xt0).0); + assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]); + + // None error means force removal + // todo + + assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]); +} + +#[test] +fn fatp_invalid_report_future_dont_remove_from_pool() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = TestPoolBuilder::new().build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + api.set_nonce(api.genesis_hash(), Dave.into(), 500); + api.set_nonce(api.genesis_hash(), Eve.into(), 600); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Bob, 300); + let xt2 = uxt(Charlie, 400); + let xt3 = uxt(Dave, 500); + let xt4 = uxt(Eve, 600); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap(); + + assert_pool_status!(header01.hash(), &pool, 5, 0); + assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3, xt4]); + + let header02 = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash()))); + + assert_pool_status!(header02.hash(), &pool, 5, 0); + assert_ready_iterator!(header02.hash(), pool, [xt0, xt1, xt2, xt3, xt4]); + + let xt0_report = ( + pool.api().hash_and_length(&xt0).0, + Some(TransactionValidityError::Invalid(InvalidTransaction::Stale)), + ); + let xt1_report = ( + pool.api().hash_and_length(&xt1).0, + Some(TransactionValidityError::Invalid(InvalidTransaction::Future)), + ); + let xt4_report = ( + pool.api().hash_and_length(&xt4).0, + Some(TransactionValidityError::Invalid(InvalidTransaction::BadProof)), + ); + let invalid_txs = [xt0_report, xt1_report, xt4_report].into(); + let result = pool.report_invalid(Some(header01.hash()), invalid_txs); + + assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]); + + // future does not cause tx to be removed from the pool + assert!(result.len() == 1); + assert!(result[0].hash == pool.api().hash_and_length(&xt4).0); + assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]); + + assert_pool_status!(header02.hash(), &pool, 4, 0); + assert_ready_iterator!(header02.hash(), pool, [xt0, xt1, xt2, xt3]); + + assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]); +} + +#[test] +fn fatp_invalid_tx_is_removed_from_the_pool() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = TestPoolBuilder::new().build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + api.set_nonce(api.genesis_hash(), Dave.into(), 500); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Bob, 300); + let xt2 = uxt(Charlie, 400); + let xt3 = uxt(Dave, 500); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + + assert_pool_status!(header01.hash(), &pool, 4, 0); + assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]); + + let xt0_report = ( + pool.api().hash_and_length(&xt0).0, + Some(TransactionValidityError::Invalid(InvalidTransaction::BadProof)), + ); + let xt1_report = (pool.api().hash_and_length(&xt1).0, None); + let invalid_txs = [xt0_report, xt1_report].into(); + let result = pool.report_invalid(Some(header01.hash()), invalid_txs); + assert!(result.iter().any(|tx| tx.hash == pool.api().hash_and_length(&xt0).0)); + assert_pool_status!(header01.hash(), &pool, 2, 0); + assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]); + + let header02 = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash()))); + assert_pool_status!(header02.hash(), &pool, 2, 0); + assert_ready_iterator!(header02.hash(), pool, [xt2, xt3]); + + assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]); + assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]); + assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]); +} + +#[test] +fn fatp_invalid_tx_is_removed_from_the_pool_future_subtree_stays() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = TestPoolBuilder::new().build(); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Alice, 201); + let xt2 = uxt(Alice, 202); + let xt3 = uxt(Alice, 203); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + + assert_pool_status!(header01.hash(), &pool, 4, 0); + assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]); + + let xt0_report = ( + pool.api().hash_and_length(&xt0).0, + Some(TransactionValidityError::Invalid(InvalidTransaction::BadProof)), + ); + let invalid_txs = [xt0_report].into(); + let result = pool.report_invalid(Some(header01.hash()), invalid_txs); + assert_eq!(result[0].hash, pool.api().hash_and_length(&xt0).0); + assert_pool_status!(header01.hash(), &pool, 0, 0); + assert_ready_iterator!(header01.hash(), pool, []); + + let header02 = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash()))); + assert_pool_status!(header02.hash(), &pool, 0, 3); + assert_future_iterator!(header02.hash(), pool, [xt1, xt2, xt3]); + + assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]); + assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]); +} + +#[test] +fn fatp_invalid_tx_is_removed_from_the_pool2() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = TestPoolBuilder::new().build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + api.set_nonce(api.genesis_hash(), Dave.into(), 500); + + let header01 = api.push_block(1, vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, None, header01.hash()))); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Bob, 300); + let xt2 = uxt(Charlie, 400); + let xt3 = uxt(Dave, 500); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + + assert_pool_status!(header01.hash(), &pool, 4, 0); + assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]); + + let header02a = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02a.hash()))); + assert_pool_status!(header02a.hash(), &pool, 4, 0); + assert_ready_iterator!(header02a.hash(), pool, [xt0, xt1, xt2, xt3]); + + let header02b = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header02a.hash()), header02b.hash()))); + + assert_pool_status!(header02b.hash(), &pool, 4, 0); + assert_ready_iterator!(header02b.hash(), pool, [xt0, xt1, xt2, xt3]); + + let xt0_report = ( + pool.api().hash_and_length(&xt0).0, + Some(TransactionValidityError::Invalid(InvalidTransaction::BadProof)), + ); + let xt1_report = (pool.api().hash_and_length(&xt1).0, None); + let invalid_txs = [xt0_report, xt1_report].into(); + let result = pool.report_invalid(Some(header01.hash()), invalid_txs); + assert!(result.iter().any(|tx| tx.hash == pool.api().hash_and_length(&xt0).0)); + assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]); + assert_pool_status!(header02a.hash(), &pool, 2, 0); + assert_ready_iterator!(header02a.hash(), pool, [xt2, xt3]); + assert_pool_status!(header02b.hash(), &pool, 2, 0); + assert_ready_iterator!(header02b.hash(), pool, [xt2, xt3]); + + let header03 = api.push_block_with_parent(header02b.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header02b.hash()), header03.hash()))); + assert_pool_status!(header03.hash(), &pool, 2, 0); + assert_ready_iterator!(header03.hash(), pool, [xt2, xt3]); + + assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]); + assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]); + assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]); +} -- GitLab