diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 7a44b0ed7701a88666caed077cbfee01d0330a57..733f15b9cbe3e5cc561d2fdb8fe0c90a57d01f8d 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -6263,6 +6263,7 @@ dependencies = [ "serde_json", "sp-core", "sp-rpc", + "sp-runtime", "sp-transaction-pool", "sp-version", ] @@ -6424,11 +6425,13 @@ dependencies = [ "criterion 0.3.1", "derive_more", "futures 0.3.4", + "linked-hash-map", "log 0.4.8", "parity-scale-codec", "parity-util-mem", "parking_lot 0.10.0", "serde", + "sp-blockchain", "sp-core", "sp-runtime", "sp-transaction-pool", @@ -7644,6 +7647,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.10.0", "sc-transaction-graph", + "sp-blockchain", "sp-runtime", "sp-transaction-pool", "substrate-test-runtime-client", diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs index 029f746e625e4172bf6f7e8621856602de03afc7..c820407e748c0d08c2fae96ba402897ca5682975 100644 --- a/substrate/client/consensus/manual-seal/src/lib.rs +++ b/substrate/client/consensus/manual-seal/src/lib.rs @@ -198,7 +198,7 @@ pub async fn run_instant_seal<B, CB, E, A, C, T>( { // instant-seal creates blocks as soon as transactions are imported // into the transaction pool. - let seal_block_channel = pool.import_notification_stream() + let seal_block_channel = pool.validated_pool().import_notification_stream() .map(|_| { EngineCommand::SealNewBlock { create_empty: false, @@ -260,7 +260,7 @@ mod tests { // this test checks that blocks are created as soon as transactions are imported into the pool. let (sender, receiver) = futures::channel::oneshot::channel(); let mut sender = Arc::new(Some(sender)); - let stream = pool.pool().import_notification_stream() + let stream = pool.pool().validated_pool().import_notification_stream() .map(move |_| { // we're only going to submit one tx so this fn will only be called once. let mut_sender = Arc::get_mut(&mut sender).unwrap(); diff --git a/substrate/client/consensus/manual-seal/src/seal_new_block.rs b/substrate/client/consensus/manual-seal/src/seal_new_block.rs index 53dc82d353e673a82c6159707e2e2bc3c14a712a..2b8d867ce3ca6e74f957dd97792e2f5fd6eb835d 100644 --- a/substrate/client/consensus/manual-seal/src/seal_new_block.rs +++ b/substrate/client/consensus/manual-seal/src/seal_new_block.rs @@ -92,7 +92,7 @@ pub async fn seal_new_block<B, SC, CB, E, T, P>( SC: SelectChain<B>, { let future = async { - if pool.status().ready == 0 && !create_empty { + if pool.validated_pool().status().ready == 0 && !create_empty { return Err(Error::EmptyTransactionPool) } diff --git a/substrate/client/rpc-api/Cargo.toml b/substrate/client/rpc-api/Cargo.toml index 4781c9d35096ca02e68f0e6d59b2ca963bed6e24..b9f4cbb0159ce5e059499c38829860aa7cef5599 100644 --- a/substrate/client/rpc-api/Cargo.toml +++ b/substrate/client/rpc-api/Cargo.toml @@ -17,6 +17,7 @@ log = "0.4.8" parking_lot = "0.10.0" sp-core = { version = "2.0.0", path = "../../primitives/core" } sp-version = { version = "2.0.0", path = "../../primitives/version" } +sp-runtime = { path = "../../primitives/runtime" } serde = { version = "1.0.101", features = ["derive"] } serde_json = "1.0.41" sp-transaction-pool = { version = "2.0.0", path = "../../primitives/transaction-pool" } diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index e29c503402b2e59da34e213c1817e53bb02f56b8..5ca39856dc4132a04c68b25083bd58f995bff5ef 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -51,7 +51,7 @@ use std::{ use wasm_timer::SystemTime; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use sc_telemetry::{telemetry, SUBSTRATE_INFO}; -use sp_transaction_pool::MaintainedTransactionPool; +use sp_transaction_pool::{MaintainedTransactionPool, ChainEvent}; use sp_blockchain; use grafana_data_source::{self, record_metrics}; @@ -882,42 +882,52 @@ ServiceBuilder< let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone(); let is_validator = config.roles.is_authority(); - let events = client.import_notification_stream() - .for_each(move |notification| { - let txpool = txpool.upgrade(); + let (import_stream, finality_stream) = ( + client.import_notification_stream().map(|n| ChainEvent::NewBlock { + id: BlockId::Hash(n.hash), + header: n.header, + retracted: n.retracted, + is_new_best: n.is_new_best, + }), + client.finality_notification_stream().map(|n| ChainEvent::Finalized { + hash: n.hash + }) + ); + let events = futures::stream::select(import_stream, finality_stream) + .for_each(move |event| { + // offchain worker is only interested in block import events + if let ChainEvent::NewBlock { ref header, is_new_best, .. } = event { + let offchain = offchain.as_ref().and_then(|o| o.upgrade()); + match offchain { + Some(offchain) if is_new_best => { + let future = offchain.on_block_imported( + &header, + network_state_info.clone(), + is_validator, + ); + let _ = to_spawn_tx_.unbounded_send(( + Box::pin(future), + From::from("offchain-on-block"), + )); + }, + Some(_) => log::debug!( + target: "sc_offchain", + "Skipping offchain workers for non-canon block: {:?}", + header, + ), + _ => {}, + } + }; + let txpool = txpool.upgrade(); if let Some(txpool) = txpool.as_ref() { - let future = txpool.maintain( - &BlockId::hash(notification.hash), - ¬ification.retracted, - ); + let future = txpool.maintain(event); let _ = to_spawn_tx_.unbounded_send(( Box::pin(future), From::from("txpool-maintain") )); } - let offchain = offchain.as_ref().and_then(|o| o.upgrade()); - match offchain { - Some(offchain) if notification.is_new_best => { - let future = offchain.on_block_imported( - ¬ification.header, - network_state_info.clone(), - is_validator, - ); - let _ = to_spawn_tx_.unbounded_send(( - Box::pin(future), - From::from("offchain-on-block"), - )); - }, - Some(_) => log::debug!( - target: "sc_offchain", - "Skipping offchain workers for non-canon block: {:?}", - notification.header, - ), - _ => {}, - } - ready(()) }); let _ = to_spawn_tx.unbounded_send(( diff --git a/substrate/client/transaction-pool/graph/Cargo.toml b/substrate/client/transaction-pool/graph/Cargo.toml index 3bbe46bc50427a31696b498c81d3ff3b0dd6bd5f..daec970a69493a5e9f49272c5a3bd3cae0bd9954 100644 --- a/substrate/client/transaction-pool/graph/Cargo.toml +++ b/substrate/client/transaction-pool/graph/Cargo.toml @@ -12,10 +12,12 @@ log = "0.4.8" parking_lot = "0.10.0" serde = { version = "1.0.101", features = ["derive"] } wasm-timer = "0.2" +sp-blockchain = { version = "2.0.0", path = "../../../primitives/blockchain" } sp-core = { version = "2.0.0", path = "../../../primitives/core" } sp-runtime = { version = "2.0.0", path = "../../../primitives/runtime" } sp-transaction-pool = { version = "2.0.0", path = "../../../primitives/transaction-pool" } parity-util-mem = { version = "0.5.1", default-features = false, features = ["primitive-types"] } +linked-hash-map = "0.5.2" [dev-dependencies] assert_matches = "1.3.0" diff --git a/substrate/client/transaction-pool/graph/benches/basics.rs b/substrate/client/transaction-pool/graph/benches/basics.rs index f3f67f1446f5cea37904f82382544a83062c6630..54bbe930b393bd37a94cb648272f1a66199cb691 100644 --- a/substrate/client/transaction-pool/graph/benches/basics.rs +++ b/substrate/client/transaction-pool/graph/benches/basics.rs @@ -135,8 +135,8 @@ fn bench_configured(pool: Pool<TestApi>, number: u64) { let res = block_on(futures::future::join_all(futures.into_iter())); assert!(res.iter().all(Result::is_ok)); - assert_eq!(pool.status().future, 0); - assert_eq!(pool.status().ready, number as usize); + assert_eq!(pool.validated_pool().status().future, 0); + assert_eq!(pool.validated_pool().status().ready, number as usize); // Prune all transactions. let block_num = 6; @@ -147,8 +147,8 @@ fn bench_configured(pool: Pool<TestApi>, number: u64) { )).expect("Prune failed"); // pool is empty - assert_eq!(pool.status().ready, 0); - assert_eq!(pool.status().future, 0); + assert_eq!(pool.validated_pool().status().ready, 0); + assert_eq!(pool.validated_pool().status().future, 0); } fn benchmark_main(c: &mut Criterion) { diff --git a/substrate/client/transaction-pool/graph/src/listener.rs b/substrate/client/transaction-pool/graph/src/listener.rs index dab2a6f5aae3f5021b8cb4d71357db9257f32b64..be6fb5c990585ec4d74919b9d9b2f258f698e4c7 100644 --- a/substrate/client/transaction-pool/graph/src/listener.rs +++ b/substrate/client/transaction-pool/graph/src/listener.rs @@ -16,30 +16,34 @@ // along with Substrate. If not, see <http://www.gnu.org/licenses/>. use std::{ - collections::HashMap, - fmt, - hash, + collections::HashMap, hash, }; +use linked_hash_map::LinkedHashMap; use serde::Serialize; -use crate::watcher; -use sp_runtime::traits; +use crate::{watcher, ChainApi, BlockHash}; use log::{debug, trace, warn}; +use sp_runtime::traits; /// Extrinsic pool default listener. -pub struct Listener<H: hash::Hash + Eq, H2> { - watchers: HashMap<H, watcher::Sender<H, H2>> +pub struct Listener<H: hash::Hash + Eq, C: ChainApi> { + watchers: HashMap<H, watcher::Sender<H, BlockHash<C>>>, + finality_watchers: LinkedHashMap<BlockHash<C>, Vec<H>>, } -impl<H: hash::Hash + Eq, H2> Default for Listener<H, H2> { +/// Maximum number of blocks awaiting finality at any time. +const MAX_FINALITY_WATCHERS: usize = 512; + +impl<H: hash::Hash + Eq, C: ChainApi> Default for Listener<H, C> { fn default() -> Self { Listener { watchers: Default::default(), + finality_watchers: Default::default(), } } } -impl<H: hash::Hash + traits::Member + Serialize, H2: Clone + fmt::Debug> Listener<H, H2> { - fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, H2>) { +impl<H: hash::Hash + traits::Member + Serialize, C: ChainApi> Listener<H, C> { + fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, BlockHash<C>>) { let clean = if let Some(h) = self.watchers.get_mut(hash) { fun(h); h.is_done() @@ -55,7 +59,7 @@ impl<H: hash::Hash + traits::Member + Serialize, H2: Clone + fmt::Debug> Listene /// Creates a new watcher for given verified extrinsic. /// /// The watcher can be used to subscribe to lifecycle events of that extrinsic. - pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, H2> { + pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher<H, BlockHash<C>> { let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default); sender.new_watcher(hash) } @@ -101,8 +105,34 @@ impl<H: hash::Hash + traits::Member + Serialize, H2: Clone + fmt::Debug> Listene } /// Transaction was pruned from the pool. - pub fn pruned(&mut self, header_hash: H2, tx: &H) { - debug!(target: "txpool", "[{:?}] Pruned at {:?}", tx, header_hash); - self.fire(tx, |watcher| watcher.in_block(header_hash)) + pub fn pruned(&mut self, block_hash: BlockHash<C>, tx: &H) { + debug!(target: "txpool", "[{:?}] Pruned at {:?}", tx, block_hash); + self.fire(tx, |s| s.in_block(block_hash)); + self.finality_watchers.entry(block_hash).or_insert(vec![]).push(tx.clone()); + + while self.finality_watchers.len() > MAX_FINALITY_WATCHERS { + if let Some((hash, txs)) = self.finality_watchers.pop_front() { + for tx in txs { + self.fire(&tx, |s| s.finality_timeout(hash.clone())); + } + } + } + } + + /// The block this transaction was included in has been retracted. + pub fn retracted(&mut self, block_hash: BlockHash<C>) { + if let Some(hashes) = self.finality_watchers.remove(&block_hash) { + for hash in hashes { + self.fire(&hash, |s| s.retracted(block_hash)) + } + } + } + + /// Notify all watchers that transactions have been finalized + pub fn finalized(&mut self, block_hash: BlockHash<C>, txs: Vec<H>) { + self.finality_watchers.remove(&block_hash); + for h in txs { + self.fire(&h, |s| s.finalized(block_hash.clone())) + } } } diff --git a/substrate/client/transaction-pool/graph/src/pool.rs b/substrate/client/transaction-pool/graph/src/pool.rs index edcd211df9f36ea88a39741683e19b4e17fdf51d..392abdca4899c70f36ea0341bd1e310e7fb9b891 100644 --- a/substrate/client/transaction-pool/graph/src/pool.rs +++ b/substrate/client/transaction-pool/graph/src/pool.rs @@ -33,7 +33,7 @@ use sp_runtime::{ traits::{self, SaturatedConversion}, transaction_validity::{TransactionValidity, TransactionTag as Tag, TransactionValidityError}, }; -use sp_transaction_pool::{error, PoolStatus}; +use sp_transaction_pool::error; use wasm_timer::Instant; use crate::validated_pool::{ValidatedPool, ValidatedTransaction}; @@ -338,34 +338,6 @@ impl<B: ChainApi> Pool<B> { ) } - /// Return an event stream of notifications for when transactions are imported to the pool. - /// - /// Consumers of this stream should use the `ready` method to actually get the - /// pending transactions in the right order. - pub fn import_notification_stream(&self) -> EventStream<ExHash<B>> { - self.validated_pool.import_notification_stream() - } - - /// Invoked when extrinsics are broadcasted. - pub fn on_broadcasted(&self, propagated: HashMap<ExHash<B>, Vec<String>>) { - self.validated_pool.on_broadcasted(propagated) - } - - /// Remove invalid transactions from the pool. - pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> { - self.validated_pool.remove_invalid(hashes) - } - - /// Get an iterator for ready transactions ordered by priority - pub fn ready(&self) -> impl Iterator<Item=TransactionFor<B>> { - self.validated_pool.ready() - } - - /// Returns pool status. - pub fn status(&self) -> PoolStatus { - self.validated_pool.status() - } - /// Returns transaction hash pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExHash<B> { self.validated_pool.api().hash_and_length(xt).0 @@ -454,9 +426,9 @@ impl<B: ChainApi> Pool<B> { (hash, validity) } - /// Get ready transaction by hash, if it present in the pool. - pub fn ready_transaction(&self, hash: &ExHash<B>) -> Option<TransactionFor<B>> { - self.validated_pool.ready_by_hash(hash) + /// get a reference to the underlying validated pool. + pub fn validated_pool(&self) -> &ValidatedPool<B> { + &self.validated_pool } } @@ -598,7 +570,7 @@ mod tests { }))).unwrap(); // then - assert_eq!(pool.ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]); + assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]); } #[test] @@ -615,8 +587,8 @@ mod tests { // when pool.validated_pool.rotator().ban(&Instant::now(), vec![pool.hash_of(&uxt)]); let res = block_on(pool.submit_one(&BlockId::Number(0), uxt)); - assert_eq!(pool.status().ready, 0); - assert_eq!(pool.status().future, 0); + assert_eq!(pool.validated_pool().status().ready, 0); + assert_eq!(pool.validated_pool().status().future, 0); // then assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned); @@ -627,7 +599,7 @@ mod tests { let stream = { // given let pool = pool(); - let stream = pool.import_notification_stream(); + let stream = pool.validated_pool().import_notification_stream(); // when let _hash = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { @@ -650,8 +622,8 @@ mod tests { nonce: 3, }))).unwrap(); - assert_eq!(pool.status().ready, 2); - assert_eq!(pool.status().future, 1); + assert_eq!(pool.validated_pool().status().ready, 2); + assert_eq!(pool.validated_pool().status().future, 1); stream }; @@ -689,9 +661,9 @@ mod tests { pool.validated_pool.clear_stale(&BlockId::Number(5)).unwrap(); // then - assert_eq!(pool.ready().count(), 0); - assert_eq!(pool.status().future, 0); - assert_eq!(pool.status().ready, 0); + assert_eq!(pool.validated_pool().ready().count(), 0); + assert_eq!(pool.validated_pool().status().future, 0); + assert_eq!(pool.validated_pool().status().ready, 0); // make sure they are temporarily banned as well assert!(pool.validated_pool.rotator().is_banned(&hash1)); assert!(pool.validated_pool.rotator().is_banned(&hash2)); @@ -735,7 +707,7 @@ mod tests { amount: 5, nonce: 1, }))).unwrap(); - assert_eq!(pool.status().future, 1); + assert_eq!(pool.validated_pool().status().future, 1); // when let hash2 = block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { @@ -746,7 +718,7 @@ mod tests { }))).unwrap(); // then - assert_eq!(pool.status().future, 1); + assert_eq!(pool.validated_pool().status().future, 1); assert!(pool.validated_pool.rotator().is_banned(&hash1)); assert!(!pool.validated_pool.rotator().is_banned(&hash2)); } @@ -773,8 +745,8 @@ mod tests { }))).unwrap_err(); // then - assert_eq!(pool.status().ready, 0); - assert_eq!(pool.status().future, 0); + assert_eq!(pool.validated_pool().status().ready, 0); + assert_eq!(pool.validated_pool().status().future, 0); } #[test] @@ -791,8 +763,8 @@ mod tests { }))).unwrap_err(); // then - assert_eq!(pool.status().ready, 0); - assert_eq!(pool.status().future, 0); + assert_eq!(pool.validated_pool().status().ready, 0); + assert_eq!(pool.validated_pool().status().future, 0); assert_matches!(err, error::Error::NoTagsProvided); } @@ -809,19 +781,18 @@ mod tests { amount: 5, nonce: 0, }))).unwrap(); - assert_eq!(pool.status().ready, 1); - assert_eq!(pool.status().future, 0); + assert_eq!(pool.validated_pool().status().ready, 1); + assert_eq!(pool.validated_pool().status().future, 0); // when block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap(); - assert_eq!(pool.status().ready, 0); - assert_eq!(pool.status().future, 0); + assert_eq!(pool.validated_pool().status().ready, 0); + assert_eq!(pool.validated_pool().status().future, 0); // then let mut stream = futures::executor::block_on_stream(watcher.into_stream()); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); assert_eq!(stream.next(), Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into()))); - assert_eq!(stream.next(), None); } #[test] @@ -834,19 +805,18 @@ mod tests { amount: 5, nonce: 0, }))).unwrap(); - assert_eq!(pool.status().ready, 1); - assert_eq!(pool.status().future, 0); + assert_eq!(pool.validated_pool().status().ready, 1); + assert_eq!(pool.validated_pool().status().future, 0); // when block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64])).unwrap(); - assert_eq!(pool.status().ready, 0); - assert_eq!(pool.status().future, 0); + assert_eq!(pool.validated_pool().status().ready, 0); + assert_eq!(pool.validated_pool().status().future, 0); // then let mut stream = futures::executor::block_on_stream(watcher.into_stream()); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); assert_eq!(stream.next(), Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into()))); - assert_eq!(stream.next(), None); } #[test] @@ -859,8 +829,8 @@ mod tests { amount: 5, nonce: 1, }))).unwrap(); - assert_eq!(pool.status().ready, 0); - assert_eq!(pool.status().future, 1); + assert_eq!(pool.validated_pool().status().ready, 0); + assert_eq!(pool.validated_pool().status().future, 1); // when block_on(pool.submit_one(&BlockId::Number(0), uxt(Transfer { @@ -869,7 +839,7 @@ mod tests { amount: 5, nonce: 0, }))).unwrap(); - assert_eq!(pool.status().ready, 2); + assert_eq!(pool.validated_pool().status().ready, 2); // then let mut stream = futures::executor::block_on_stream(watcher.into_stream()); @@ -888,7 +858,7 @@ mod tests { nonce: 0, }); let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt)).unwrap(); - assert_eq!(pool.status().ready, 1); + assert_eq!(pool.validated_pool().status().ready, 1); // when pool.validated_pool.remove_invalid(&[*watcher.hash()]); @@ -912,13 +882,13 @@ mod tests { nonce: 0, }); let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), uxt)).unwrap(); - assert_eq!(pool.status().ready, 1); + assert_eq!(pool.validated_pool().status().ready, 1); // when let mut map = HashMap::new(); let peers = vec!["a".into(), "b".into(), "c".into()]; map.insert(*watcher.hash(), peers.clone()); - pool.on_broadcasted(map); + pool.validated_pool().on_broadcasted(map); // then @@ -947,7 +917,7 @@ mod tests { nonce: 0, }); let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), xt)).unwrap(); - assert_eq!(pool.status().ready, 1); + assert_eq!(pool.validated_pool().status().ready, 1); // when let xt = uxt(Transfer { @@ -957,7 +927,7 @@ mod tests { nonce: 1, }); block_on(pool.submit_one(&BlockId::Number(1), xt)).unwrap(); - assert_eq!(pool.status().ready, 1); + assert_eq!(pool.validated_pool().status().ready, 1); // then let mut stream = futures::executor::block_on_stream(watcher.into_stream()); @@ -1000,11 +970,11 @@ mod tests { // The tag the above transaction provides (TestApi is using just nonce as u8) let provides = vec![0_u8]; block_on(pool.submit_one(&BlockId::Number(0), xt)).unwrap(); - assert_eq!(pool.status().ready, 1); + assert_eq!(pool.validated_pool().status().ready, 1); // Now block import happens before the second transaction is able to finish verification. block_on(pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap(); - assert_eq!(pool.status().ready, 0); + assert_eq!(pool.validated_pool().status().ready, 0); // so when we release the verification of the previous one it will have @@ -1014,8 +984,8 @@ mod tests { // then is_ready.recv().unwrap(); // wait for finish - assert_eq!(pool.status().ready, 1); - assert_eq!(pool.status().future, 0); + assert_eq!(pool.validated_pool().status().ready, 1); + assert_eq!(pool.validated_pool().status().future, 0); } } @@ -1047,7 +1017,7 @@ mod tests { let tx4 = transfer(4); let hash4 = pool.validated_pool.api().hash_and_length(&tx4).0; let watcher4 = block_on(pool.submit_and_watch(&BlockId::Number(0), tx4)).unwrap(); - assert_eq!(pool.status().ready, 5); + assert_eq!(pool.validated_pool().status().ready, 5); // when pool.validated_pool.api().invalidate.lock().insert(hash3); @@ -1064,7 +1034,7 @@ mod tests { // // events for hash3 are: Ready, Invalid // events for hash4 are: Ready, Invalid - assert_eq!(pool.status().ready, 2); + assert_eq!(pool.validated_pool().status().ready, 2); assert_eq!( futures::executor::block_on_stream(watcher3.into_stream()).collect::<Vec<_>>(), vec![TransactionStatus::Ready, TransactionStatus::Invalid], @@ -1095,4 +1065,3 @@ mod tests { ); } } - diff --git a/substrate/client/transaction-pool/graph/src/validated_pool.rs b/substrate/client/transaction-pool/graph/src/validated_pool.rs index d02bc0ec7236d4eede749965ecd9b615775463a1..875241463753c2ab2f4f9e3d6c17d1d8f24d5b1b 100644 --- a/substrate/client/transaction-pool/graph/src/validated_pool.rs +++ b/substrate/client/transaction-pool/graph/src/validated_pool.rs @@ -16,12 +16,11 @@ use std::{ collections::{HashSet, HashMap}, - fmt, hash, sync::Arc, }; -use crate::base_pool as base; +use crate::{base_pool as base, BlockHash}; use crate::listener::Listener; use crate::rotator::PoolRotator; use crate::watcher::Watcher; @@ -39,7 +38,7 @@ use sp_transaction_pool::{error, PoolStatus}; use wasm_timer::Instant; use crate::base_pool::PruneStatus; -use crate::pool::{EventStream, Options, ChainApi, BlockHash, ExHash, ExtrinsicFor, TransactionFor}; +use crate::pool::{EventStream, Options, ChainApi, ExHash, ExtrinsicFor, TransactionFor}; /// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum. #[derive(Debug)] @@ -62,10 +61,10 @@ pub type ValidatedTransactionFor<B> = ValidatedTransaction< >; /// Pool that deals with validated transactions. -pub(crate) struct ValidatedPool<B: ChainApi> { +pub struct ValidatedPool<B: ChainApi> { api: Arc<B>, options: Options, - listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>, + listener: RwLock<Listener<ExHash<B>, B>>, pool: RwLock<base::BasePool< ExHash<B>, ExtrinsicFor<B>, @@ -91,9 +90,9 @@ impl<B: ChainApi> ValidatedPool<B> { pub fn new(options: Options, api: Arc<B>) -> Self { let base_pool = base::BasePool::new(options.reject_future_transactions); ValidatedPool { - api, options, listener: Default::default(), + api, pool: RwLock::new(base_pool), import_notification_sinks: Default::default(), rotator: Default::default(), @@ -138,13 +137,14 @@ impl<B: ChainApi> ValidatedPool<B> { let imported = self.pool.write().import(tx)?; if let base::Imported::Ready { ref hash, .. } = imported { - self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(hash.clone()).is_ok()); + self.import_notification_sinks.lock() + .retain(|sink| sink.unbounded_send(hash.clone()).is_ok()); } let mut listener = self.listener.write(); fire_events(&mut *listener, &imported); Ok(imported.hash().clone()) - } + }, ValidatedTransaction::Invalid(hash, err) => { self.rotator.ban(&Instant::now(), std::iter::once(hash)); Err(err.into()) @@ -152,7 +152,7 @@ impl<B: ChainApi> ValidatedPool<B> { ValidatedTransaction::Unknown(hash, err) => { self.listener.write().invalid(&hash, false); Err(err.into()) - } + }, } } @@ -343,8 +343,7 @@ impl<B: ChainApi> ValidatedPool<B> { self.pool.read().by_hashes(&hashes) .into_iter() .map(|existing_in_pool| existing_in_pool - .map(|transaction| transaction.provides.iter().cloned() - .collect())) + .map(|transaction| transaction.provides.iter().cloned().collect())) .collect() } @@ -416,8 +415,14 @@ impl<B: ChainApi> ValidatedPool<B> { let header_hash = self.api.block_id_to_hash(at)? .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?; let mut listener = self.listener.write(); + let mut set = HashSet::with_capacity(hashes.size_hint().0); for h in hashes { - listener.pruned(header_hash, &h); + // `hashes` has possibly duplicate hashes. + // we'd like to send out the `InBlock` notification only once. + if !set.contains(&h) { + listener.pruned(header_hash, &h); + set.insert(h); + } } Ok(()) } @@ -468,7 +473,10 @@ impl<B: ChainApi> ValidatedPool<B> { &self.api } - /// Return an event stream of transactions imported to the pool. + /// Return an event stream of notifications for when transactions are imported to the pool. + /// + /// Consumers of this stream should use the `ready` method to actually get the + /// pending transactions in the right order. pub fn import_notification_stream(&self) -> EventStream<ExHash<B>> { let (sink, stream) = mpsc::unbounded(); self.import_notification_sinks.lock().push(sink); @@ -492,7 +500,7 @@ impl<B: ChainApi> ValidatedPool<B> { pub fn remove_invalid(&self, hashes: &[ExHash<B>]) -> Vec<TransactionFor<B>> { // early exit in case there is no invalid transactions. if hashes.is_empty() { - return vec![] + return vec![]; } debug!(target: "txpool", "Removing invalid transactions: {:?}", hashes); @@ -521,14 +529,34 @@ impl<B: ChainApi> ValidatedPool<B> { pub fn status(&self) -> PoolStatus { self.pool.read().status() } + + /// Notify all watchers that transactions in the block with hash have been finalized + pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> { + debug!(target: "txpool", "Attempting to notify watchers of finalization for {}", block_hash); + // fetch all extrinsic hashes + if let Some(txs) = self.api.block_body(&BlockId::Hash(block_hash.clone())).await? { + let tx_hashes = txs.into_iter() + .map(|tx| self.api.hash_and_length(&tx).0) + .collect::<Vec<_>>(); + // notify the watcher that these extrinsics have been finalized + self.listener.write().finalized(block_hash, tx_hashes); + } + + Ok(()) + } + + /// Notify the listener of retracted blocks + pub fn on_block_retracted(&self, block_hash: BlockHash<B>) { + self.listener.write().retracted(block_hash) + } } -fn fire_events<H, H2, Ex>( - listener: &mut Listener<H, H2>, +fn fire_events<H, B, Ex>( + listener: &mut Listener<H, B>, imported: &base::Imported<H, Ex>, ) where H: hash::Hash + Eq + traits::Member + Serialize, - H2: Clone + fmt::Debug, + B: ChainApi, { match *imported { base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => { diff --git a/substrate/client/transaction-pool/graph/src/watcher.rs b/substrate/client/transaction-pool/graph/src/watcher.rs index f9c234f73c3b8aad3fa86618750e9cab06b70676..d28f6814e455213a443b583646ee2af6a3abb335 100644 --- a/substrate/client/transaction-pool/graph/src/watcher.rs +++ b/substrate/client/transaction-pool/graph/src/watcher.rs @@ -26,12 +26,12 @@ use sp_transaction_pool::TransactionStatus; /// /// Represents a stream of status updates for particular extrinsic. #[derive(Debug)] -pub struct Watcher<H, H2> { - receiver: mpsc::UnboundedReceiver<TransactionStatus<H, H2>>, +pub struct Watcher<H, BH> { + receiver: mpsc::UnboundedReceiver<TransactionStatus<H, BH>>, hash: H, } -impl<H, H2> Watcher<H, H2> { +impl<H, BH> Watcher<H, BH> { /// Returns the transaction hash. pub fn hash(&self) -> &H { &self.hash @@ -40,30 +40,30 @@ impl<H, H2> Watcher<H, H2> { /// Pipe the notifications to given sink. /// /// Make sure to drive the future to completion. - pub fn into_stream(self) -> impl Stream<Item=TransactionStatus<H, H2>> { + pub fn into_stream(self) -> impl Stream<Item=TransactionStatus<H, BH>> { self.receiver } } /// Sender part of the watcher. Exposed only for testing purposes. #[derive(Debug)] -pub struct Sender<H, H2> { - receivers: Vec<mpsc::UnboundedSender<TransactionStatus<H, H2>>>, - finalized: bool, +pub struct Sender<H, BH> { + receivers: Vec<mpsc::UnboundedSender<TransactionStatus<H, BH>>>, + is_finalized: bool, } -impl<H, H2> Default for Sender<H, H2> { +impl<H, BH> Default for Sender<H, BH> { fn default() -> Self { Sender { receivers: Default::default(), - finalized: false, + is_finalized: false, } } } -impl<H: Clone, H2: Clone> Sender<H, H2> { +impl<H: Clone, BH: Clone> Sender<H, BH> { /// Add a new watcher to this sender object. - pub fn new_watcher(&mut self, hash: H) -> Watcher<H, H2> { + pub fn new_watcher(&mut self, hash: H) -> Watcher<H, BH> { let (tx, receiver) = mpsc::unbounded(); self.receivers.push(tx); Watcher { @@ -85,26 +85,42 @@ impl<H: Clone, H2: Clone> Sender<H, H2> { /// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid. pub fn usurped(&mut self, hash: H) { self.send(TransactionStatus::Usurped(hash)); - self.finalized = true; + self.is_finalized = true; } /// Extrinsic has been included in block with given hash. - pub fn in_block(&mut self, hash: H2) { + pub fn in_block(&mut self, hash: BH) { self.send(TransactionStatus::InBlock(hash)); - self.finalized = true; + } + + /// Extrinsic has been finalized by a finality gadget. + pub fn finalized(&mut self, hash: BH) { + self.send(TransactionStatus::Finalized(hash)); + self.is_finalized = true; + } + + /// The block this extrinsic was included in has been retracted + pub fn finality_timeout(&mut self, hash: BH) { + self.send(TransactionStatus::FinalityTimeout(hash)); + self.is_finalized = true; + } + + /// The block this extrinsic was included in has been retracted + pub fn retracted(&mut self, hash: BH) { + self.send(TransactionStatus::Retracted(hash)); } /// Extrinsic has been marked as invalid by the block builder. pub fn invalid(&mut self) { self.send(TransactionStatus::Invalid); // we mark as finalized as there are no more notifications - self.finalized = true; + self.is_finalized = true; } /// Transaction has been dropped from the pool because of the limit. pub fn dropped(&mut self) { self.send(TransactionStatus::Dropped); - self.finalized = true; + self.is_finalized = true; } /// The extrinsic has been broadcast to the given peers. @@ -114,10 +130,10 @@ impl<H: Clone, H2: Clone> Sender<H, H2> { /// Returns true if the are no more listeners for this extrinsic or it was finalized. pub fn is_done(&self) -> bool { - self.finalized || self.receivers.is_empty() + self.is_finalized || self.receivers.is_empty() } - fn send(&mut self, status: TransactionStatus<H, H2>) { + fn send(&mut self, status: TransactionStatus<H, BH>) { self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok()) } } diff --git a/substrate/client/transaction-pool/src/api.rs b/substrate/client/transaction-pool/src/api.rs index bfc13c01fdf53b0992f711fa6c58215aab12ba59..84e06cc33e4f5c83e001dce41d68e882fd2657c7 100644 --- a/substrate/client/transaction-pool/src/api.rs +++ b/substrate/client/transaction-pool/src/api.rs @@ -64,7 +64,8 @@ impl<Client, Block> FullChainApi<Client, Block> where impl<Client, Block> sc_transaction_graph::ChainApi for FullChainApi<Client, Block> where Block: BlockT, - Client: ProvideRuntimeApi<Block> + BlockBody<Block> + BlockIdTo<Block> + 'static + Send + Sync, + Client: ProvideRuntimeApi<Block> + BlockBody<Block> + BlockIdTo<Block>, + Client: Send + Sync + 'static, Client::Api: TaggedTransactionQueue<Block>, sp_api::ApiErrorFor<Client, Block>: Send, { diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs index f4895d9a11e4011f0a2b28baa089b6962a16362a..fbfc6a24e6e35b7fb920dc8d7a70116b6d5f7912 100644 --- a/substrate/client/transaction-pool/src/lib.rs +++ b/substrate/client/transaction-pool/src/lib.rs @@ -37,9 +37,8 @@ use sp_runtime::{ traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic}, }; use sp_transaction_pool::{ - TransactionPool, PoolStatus, ImportNotificationStream, - TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash, - MaintainedTransactionPool, PoolFuture, + TransactionPool, PoolStatus, ImportNotificationStream, TxHash, TransactionFor, + TransactionStatusStreamFor, MaintainedTransactionPool, PoolFuture, ChainEvent, }; use wasm_timer::Instant; @@ -115,7 +114,6 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block> } )), } - } /// Gets shared reference to the underlying pool. @@ -174,19 +172,19 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block> } fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> { - self.pool.remove_invalid(hashes) + self.pool.validated_pool().remove_invalid(hashes) } fn status(&self) -> PoolStatus { - self.pool.status() + self.pool.validated_pool().status() } fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>> { - Box::new(self.pool.ready()) + Box::new(self.pool.validated_pool().ready()) } fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> { - self.pool.import_notification_stream() + self.pool.validated_pool().import_notification_stream() } fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> { @@ -194,11 +192,11 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block> } fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) { - self.pool.on_broadcasted(propagations) + self.pool.validated_pool().on_broadcasted(propagations) } fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> { - self.pool.ready_transaction(hash) + self.pool.validated_pool().ready_by_hash(hash) } } @@ -214,7 +212,7 @@ enum RevalidationStatus<N> { enum RevalidationStrategy<N> { Always, - Light(RevalidationStatus<N>) + Light(RevalidationStatus<N>), } struct RevalidationAction { @@ -241,7 +239,7 @@ impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> { revalidate: status.next_required( block, revalidate_time_period, - revalidate_block_period + revalidate_block_period, ), resubmit: false, revalidate_amount: None, @@ -275,7 +273,7 @@ impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> { revalidate_block_period.map(|period| block + period), ); false - }, + } Self::Scheduled(revalidate_at_time, revalidate_at_block) => { let is_required = revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) || revalidate_at_block.map(|at| block >= at).unwrap_or(false); @@ -283,87 +281,105 @@ impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> { *self = Self::InProgress; } is_required - }, + } Self::InProgress => false, } } } impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block> -where - Block: BlockT, - PoolApi: 'static + sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>, + where + Block: BlockT, + PoolApi: 'static + sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>, { - fn maintain(&self, id: &BlockId<Self::Block>, retracted: &[BlockHash<Self>]) - -> Pin<Box<dyn Future<Output=()> + Send>> - { - let id = id.clone(); - let pool = self.pool.clone(); - let api = self.api.clone(); - - let block_number = match api.block_id_to_number(&id) { - Ok(Some(number)) => number, - _ => { - log::trace!(target: "txqueue", "Skipping chain event - no number for that block {:?}", id); - return Box::pin(ready(())); - } - }; - - let next_action = self.revalidation_strategy.lock().next( - block_number, - Some(std::time::Duration::from_secs(60)), - Some(20.into()), - ); - let revalidation_strategy = self.revalidation_strategy.clone(); - let retracted = retracted.to_vec(); - - async move { - // We don't query block if we won't prune anything - if !pool.status().is_empty() { - let hashes = api.block_body(&id).await - .unwrap_or_else(|e| { - log::warn!("Prune known transactions: error request {:?}!", e); - None - }) - .unwrap_or_default() - .into_iter() - .map(|tx| pool.hash_of(&tx)) - .collect::<Vec<_>>(); - - if let Err(e) = pool.prune_known(&id, &hashes) { - log::error!("Cannot prune known in the pool {:?}!", e); - } - } - - if next_action.resubmit { - let mut resubmit_transactions = Vec::new(); - - for retracted_hash in retracted { - let block_transactions = api.block_body(&BlockId::hash(retracted_hash.clone())).await - .unwrap_or_else(|e| { - log::warn!("Failed to fetch block body {:?}!", e); - None - }) - .unwrap_or_default() - .into_iter() - .filter(|tx| tx.is_signed().unwrap_or(true)); - - resubmit_transactions.extend(block_transactions); - } - if let Err(e) = pool.submit_at(&id, resubmit_transactions, true).await { - log::debug!(target: "txpool", - "[{:?}] Error re-submitting transactions: {:?}", id, e - ) - } + fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output=()> + Send>> { + match event { + ChainEvent::NewBlock { id, retracted, .. } => { + let id = id.clone(); + let pool = self.pool.clone(); + let api = self.api.clone(); + + let block_number = match api.block_id_to_number(&id) { + Ok(Some(number)) => number, + _ => { + log::trace!(target: "txqueue", "Skipping chain event - no number for that block {:?}", id); + return Box::pin(ready(())); + } + }; + + let next_action = self.revalidation_strategy.lock().next( + block_number, + Some(std::time::Duration::from_secs(60)), + Some(20.into()), + ); + let revalidation_strategy = self.revalidation_strategy.clone(); + let retracted = retracted.clone(); + + async move { + // We don't query block if we won't prune anything + if !pool.validated_pool().status().is_empty() { + let hashes = api.block_body(&id).await + .unwrap_or_else(|e| { + log::warn!("Prune known transactions: error request {:?}!", e); + None + }) + .unwrap_or_default() + .into_iter() + .map(|tx| pool.hash_of(&tx)) + .collect::<Vec<_>>(); + + if let Err(e) = pool.prune_known(&id, &hashes) { + log::error!("Cannot prune known in the pool {:?}!", e); + } + } + + if next_action.resubmit { + let mut resubmit_transactions = Vec::new(); + + for retracted_hash in retracted { + // notify txs awaiting finality that it has been retracted + pool.validated_pool().on_block_retracted(retracted_hash.clone()); + + let block_transactions = api.block_body(&BlockId::hash(retracted_hash.clone())).await + .unwrap_or_else(|e| { + log::warn!("Failed to fetch block body {:?}!", e); + None + }) + .unwrap_or_default() + .into_iter() + .filter(|tx| tx.is_signed().unwrap_or(true)); + + resubmit_transactions.extend(block_transactions); + } + if let Err(e) = pool.submit_at(&id, resubmit_transactions, true).await { + log::debug!( + target: "txpool", + "[{:?}] Error re-submitting transactions: {:?}", id, e + ) + } + } + + if next_action.revalidate { + if let Err(e) = pool.revalidate_ready(&id, next_action.revalidate_amount).await { + log::warn!("Revalidate ready failed {:?}", e); + } + } + + revalidation_strategy.lock().clear(); + }.boxed() } - - if next_action.revalidate { - if let Err(e) = pool.revalidate_ready(&id, next_action.revalidate_amount).await { - log::warn!("Revalidate ready failed {:?}", e); - } + ChainEvent::Finalized { hash } => { + let pool = self.pool.clone(); + async move { + if let Err(e) = pool.validated_pool().on_block_finalized(hash).await { + log::warn!( + target: "txpool", + "Error [{}] occurred while attempting to notify watchers of finalization {}", + e, hash + ) + } + }.boxed() } - - revalidation_strategy.lock().clear(); - }.boxed() + } } } diff --git a/substrate/client/transaction-pool/src/testing/pool.rs b/substrate/client/transaction-pool/src/testing/pool.rs index fed02067b184aa0321fe84f7c3c14ee4b772e4bd..4a4f4638df504ebe5cd02ba1db87feee23ab444c 100644 --- a/substrate/client/transaction-pool/src/testing/pool.rs +++ b/substrate/client/transaction-pool/src/testing/pool.rs @@ -15,17 +15,18 @@ // along with Substrate. If not, see <http://www.gnu.org/licenses/>. use crate::*; -use sc_transaction_graph::Pool; use futures::executor::block_on; +use txpool::{self, Pool}; use sp_runtime::{ generic::BlockId, transaction_validity::ValidTransaction, }; use substrate_test_runtime_client::{ - runtime::{Block, Hash, Index}, + runtime::{Block, Hash, Index, Header}, AccountKeyring::*, }; use substrate_test_runtime_transaction_pool::{TestApi, uxt}; +use sp_transaction_pool::TransactionStatus; fn pool() -> Pool<TestApi> { Pool::new(Default::default(), TestApi::with_alice_nonce(209).into()) @@ -35,12 +36,22 @@ fn maintained_pool() -> BasicPool<TestApi, Block> { BasicPool::new(Default::default(), std::sync::Arc::new(TestApi::with_alice_nonce(209))) } +fn header(number: u64) -> Header { + Header { + number, + digest: Default::default(), + extrinsics_root: Default::default(), + parent_hash: Default::default(), + state_root: Default::default(), + } +} + #[test] fn submission_should_work() { let pool = pool(); block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209]); } @@ -50,7 +61,7 @@ fn multiple_submission_should_work() { block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); } @@ -59,7 +70,7 @@ fn early_nonce_should_be_culled() { let pool = pool(); block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 208))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, Vec::<Index>::new()); } @@ -68,11 +79,11 @@ fn late_nonce_should_be_queued() { let pool = pool(); block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, Vec::<Index>::new()); block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); } @@ -82,7 +93,7 @@ fn prune_tags_should_work() { let hash209 = block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 209))).unwrap(); block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 210))).unwrap(); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); block_on( @@ -93,7 +104,7 @@ fn prune_tags_should_work() { ) ).expect("Prune tags"); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![210]); } @@ -102,11 +113,11 @@ fn should_ban_invalid_transactions() { let pool = pool(); let uxt = uxt(Alice, 209); let hash = block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap(); - pool.remove_invalid(&[hash]); + pool.validated_pool().remove_invalid(&[hash]); block_on(pool.submit_one(&BlockId::number(0), uxt.clone())).unwrap_err(); // when - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, Vec::<Index>::new()); // then @@ -122,29 +133,29 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { let pool = Pool::new(Default::default(), api.clone()); let xt = uxt(Alice, 209); block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported"); - assert_eq!(pool.status().ready, 1); + assert_eq!(pool.validated_pool().status().ready, 1); // remove the transaction that just got imported. api.increment_nonce(Alice.into()); block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).expect("1. Pruned"); - assert_eq!(pool.status().ready, 0); + assert_eq!(pool.validated_pool().status().ready, 0); // it's re-imported to future - assert_eq!(pool.status().future, 1); + assert_eq!(pool.validated_pool().status().future, 1); // so now let's insert another transaction that also provides the 155 api.increment_nonce(Alice.into()); let xt = uxt(Alice, 211); block_on(pool.submit_one(&BlockId::number(2), xt.clone())).expect("2. Imported"); - assert_eq!(pool.status().ready, 1); - assert_eq!(pool.status().future, 1); - let pending: Vec<_> = pool.ready().map(|a| a.data.transfer().nonce).collect(); + assert_eq!(pool.validated_pool().status().ready, 1); + assert_eq!(pool.validated_pool().status().future, 1); + let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![211]); // prune it and make sure the pool is empty api.increment_nonce(Alice.into()); block_on(pool.prune_tags(&BlockId::number(3), vec![vec![155]], vec![])).expect("2. Pruned"); - assert_eq!(pool.status().ready, 0); - assert_eq!(pool.status().future, 2); + assert_eq!(pool.validated_pool().status().ready, 0); + assert_eq!(pool.validated_pool().status().future, 2); } #[test] @@ -158,7 +169,14 @@ fn should_prune_old_during_maintenance() { pool.api.push_block(1, vec![xt.clone()]); - block_on(pool.maintain(&BlockId::number(1), &[])); + let event = ChainEvent::NewBlock { + id: BlockId::number(1), + is_new_best: true, + retracted: vec![], + header: header(1), + }; + + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } @@ -174,8 +192,14 @@ fn should_revalidate_during_maintenance() { assert_eq!(pool.api.validation_requests().len(), 2); pool.api.push_block(1, vec![xt1.clone()]); - - block_on(pool.maintain(&BlockId::number(1), &[])); + let event = ChainEvent::NewBlock { + id: BlockId::number(1), + is_new_best: true, + retracted: vec![], + header: header(1), + }; + + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 1); // test that pool revalidated transaction that left ready and not included in the block assert_eq!(pool.api.validation_requests().len(), 3); @@ -193,8 +217,14 @@ fn should_resubmit_from_retracted_during_maintaince() { pool.api.push_block(1, vec![]); pool.api.push_fork_block(retracted_hash, vec![xt.clone()]); - - block_on(pool.maintain(&BlockId::number(1), &[retracted_hash])); + let event = ChainEvent::NewBlock { + id: BlockId::Number(1), + is_new_best: true, + header: header(1), + retracted: vec![retracted_hash] + }; + + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 1); } @@ -212,7 +242,14 @@ fn should_not_retain_invalid_hashes_from_retracted() { pool.api.push_fork_block(retracted_hash, vec![xt.clone()]); pool.api.add_invalid(&xt); - block_on(pool.maintain(&BlockId::number(1), &[retracted_hash])); + let event = ChainEvent::NewBlock { + id: BlockId::Number(1), + is_new_best: true, + header: header(1), + retracted: vec![retracted_hash] + }; + + block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); } @@ -225,4 +262,188 @@ fn can_track_heap_size() { block_on(pool.submit_one(&BlockId::number(0), uxt(Alice, 212))).expect("1. Imported"); assert!(parity_util_mem::malloc_size(&pool) > 3000); -} \ No newline at end of file +} + +#[test] +fn finalization() { + let xt = uxt(Alice, 209); + let api = TestApi::with_alice_nonce(209); + api.push_block(1, vec![]); + let pool = BasicPool::new(Default::default(), api.into()); + let watcher = block_on(pool.submit_and_watch(&BlockId::number(1), xt.clone())).expect("1. Imported"); + pool.api.push_block(2, vec![xt.clone()]); + + let header = pool.api.chain().read().header_by_number.get(&2).cloned().unwrap(); + let event = ChainEvent::NewBlock { + id: BlockId::Hash(header.hash()), + is_new_best: true, + header: header.clone(), + retracted: vec![] + }; + block_on(pool.maintain(event)); + + let event = ChainEvent::Finalized { hash: header.hash() }; + block_on(pool.maintain(event)); + + let mut stream = futures::executor::block_on_stream(watcher); + assert_eq!(stream.next(), Some(TransactionStatus::Ready)); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock(header.hash()))); + assert_eq!(stream.next(), Some(TransactionStatus::Finalized(header.hash()))); + assert_eq!(stream.next(), None); +} + +#[test] +fn fork_aware_finalization() { + let api = TestApi::empty(); + // starting block A1 (last finalized.) + api.push_block(1, vec![]); + + let pool = BasicPool::new(Default::default(), api.into()); + let mut canon_watchers = vec![]; + + let from_alice = uxt(Alice, 1); + let from_dave = uxt(Dave, 1); + let from_bob = uxt(Bob, 1); + let from_charlie = uxt(Charlie, 1); + pool.api.increment_nonce(Alice.into()); + pool.api.increment_nonce(Dave.into()); + pool.api.increment_nonce(Charlie.into()); + pool.api.increment_nonce(Bob.into()); + + let from_dave_watcher; + let from_bob_watcher; + let b1; + let d1; + let c2; + let d2; + + + // block B1 + { + let watcher = block_on(pool.submit_and_watch(&BlockId::number(1), from_alice.clone())).expect("1. Imported"); + let header = pool.api.push_block(2, vec![from_alice.clone()]); + canon_watchers.push((watcher, header.hash())); + + let event = ChainEvent::NewBlock { + id: BlockId::Number(2), + is_new_best: true, + header: header.clone(), + retracted: vec![], + }; + b1 = header.hash(); + block_on(pool.maintain(event)); + let event = ChainEvent::Finalized { hash: b1 }; + block_on(pool.maintain(event)); + } + + // block C2 + { + let header = pool.api.push_fork_block_with_parent(b1, vec![from_dave.clone()]); + from_dave_watcher = block_on(pool.submit_and_watch(&BlockId::number(1), from_dave.clone())) + .expect("1. Imported"); + let event = ChainEvent::NewBlock { + id: BlockId::Hash(header.hash()), + is_new_best: true, + header: header.clone(), + retracted: vec![] + }; + c2 = header.hash(); + block_on(pool.maintain(event)); + } + + // block D2 + { + from_bob_watcher = block_on(pool.submit_and_watch(&BlockId::number(1), from_bob.clone())).expect("1. Imported"); + let header = pool.api.push_fork_block_with_parent(c2, vec![from_bob.clone()]); + + let event = ChainEvent::NewBlock { + id: BlockId::Hash(header.hash()), + is_new_best: true, + header: header.clone(), + retracted: vec![] + }; + d2 = header.hash(); + block_on(pool.maintain(event)); + } + + // block C1 + { + let watcher = block_on(pool.submit_and_watch(&BlockId::number(1), from_charlie.clone())).expect("1.Imported"); + let header = pool.api.push_block(3, vec![from_charlie.clone()]); + + canon_watchers.push((watcher, header.hash())); + let event = ChainEvent::NewBlock { + id: BlockId::Number(3), + is_new_best: true, + header: header.clone(), + retracted: vec![c2, d2], + }; + block_on(pool.maintain(event)); + let event = ChainEvent::Finalized { hash: header.hash() }; + block_on(pool.maintain(event)); + } + + // block D1 + { + let xt = uxt(Eve, 0); + let w = block_on(pool.submit_and_watch(&BlockId::number(1), xt.clone())).expect("1. Imported"); + let header = pool.api.push_block(4, vec![xt.clone()]); + canon_watchers.push((w, header.hash())); + + let event = ChainEvent::NewBlock { + id: BlockId::Hash(header.hash()), + is_new_best: true, + header: header.clone(), + retracted: vec![] + }; + d1 = header.hash(); + block_on(pool.maintain(event)); + let event = ChainEvent::Finalized { hash: d1 }; + block_on(pool.maintain(event)); + } + + let e1; + + // block e1 + { + let header = pool.api.push_block(5, vec![from_dave]); + e1 = header.hash(); + let event = ChainEvent::NewBlock { + id: BlockId::Hash(header.hash()), + is_new_best: true, + header: header.clone(), + retracted: vec![] + }; + block_on(pool.maintain(event)); + block_on(pool.maintain(ChainEvent::Finalized { hash: e1 })); + } + + + for (canon_watcher, h) in canon_watchers { + let mut stream = futures::executor::block_on_stream(canon_watcher); + assert_eq!(stream.next(), Some(TransactionStatus::Ready)); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock(h.clone()))); + assert_eq!(stream.next(), Some(TransactionStatus::Finalized(h))); + assert_eq!(stream.next(), None); + } + + + { + let mut stream= futures::executor::block_on_stream(from_dave_watcher); + assert_eq!(stream.next(), Some(TransactionStatus::Ready)); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock(c2.clone()))); + assert_eq!(stream.next(), Some(TransactionStatus::Retracted(c2))); + assert_eq!(stream.next(), Some(TransactionStatus::Ready)); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock(e1))); + assert_eq!(stream.next(), Some(TransactionStatus::Finalized(e1.clone()))); + assert_eq!(stream.next(), None); + } + + { + let mut stream= futures::executor::block_on_stream(from_bob_watcher); + assert_eq!(stream.next(), Some(TransactionStatus::Ready)); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock(d2.clone()))); + assert_eq!(stream.next(), Some(TransactionStatus::Retracted(d2))); + } + +} diff --git a/substrate/primitives/blockchain/src/header_metadata.rs b/substrate/primitives/blockchain/src/header_metadata.rs index fcd8062d1d6d577a304c840f7d31fa69c245d2cd..32dd0bcf06e27ef94f2c6abdf7be3494550e9066 100644 --- a/substrate/primitives/blockchain/src/header_metadata.rs +++ b/substrate/primitives/blockchain/src/header_metadata.rs @@ -151,7 +151,7 @@ pub fn tree_route<Block: BlockT, T: HeaderMetadata<Block>>( } /// Hash and number of a block. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct HashAndNumber<Block: BlockT> { /// The number of the block. pub number: NumberFor<Block>, diff --git a/substrate/primitives/transaction-pool/src/pool.rs b/substrate/primitives/transaction-pool/src/pool.rs index d4e6a5caf2cbacee9cf48550be5237d09652da11..6912bc79d6ec450f9c242dbef08fff9d45e76383 100644 --- a/substrate/primitives/transaction-pool/src/pool.rs +++ b/substrate/primitives/transaction-pool/src/pool.rs @@ -67,7 +67,7 @@ impl PoolStatus { /// 2. Inside `Ready` queue: /// - `Broadcast` /// 3. Leaving the pool: -/// - `InBlock` +/// - `Finalized` /// - `Invalid` /// - `Usurped` /// - `Dropped` @@ -100,6 +100,13 @@ pub enum TransactionStatus<Hash, BlockHash> { /// Transaction has been included in block with given hash. #[serde(rename = "finalized")] // See #4438 InBlock(BlockHash), + /// The block this transaction was included in has been retracted. + Retracted(BlockHash), + /// Maximum number of finality watchers has been reached, + /// old watchers are being removed. + FinalityTimeout(BlockHash), + /// Transaction has been finalized by a finality-gadget, e.g GRANDPA + Finalized(BlockHash), /// Transaction has been replaced in the pool, by another transaction /// that provides the same tags. (e.g. same (sender, nonce)). Usurped(Hash), @@ -217,11 +224,30 @@ pub trait TransactionPool: Send + Sync { fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>>; } +/// Events that the transaction pool listens for. +pub enum ChainEvent<B: BlockT> { + /// New blocks have been added to the chain + NewBlock { + /// Is this the new best block. + is_new_best: bool, + /// Id of the just imported block. + id: BlockId<B>, + /// Header of the just imported block + header: B::Header, + /// List of retracted blocks ordered by block number. + retracted: Vec<B::Hash>, + }, + /// An existing block has been finalzied. + Finalized { + /// Hash of just finalized block + hash: B::Hash, + }, +} + /// Trait for transaction pool maintenance. -pub trait MaintainedTransactionPool : TransactionPool { +pub trait MaintainedTransactionPool: TransactionPool { /// Perform maintenance - fn maintain(&self, block: &BlockId<Self::Block>, retracted: &[BlockHash<Self>]) - -> Pin<Box<dyn Future<Output=()> + Send>>; + fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output=()> + Send>>; } /// An abstraction for transaction pool. diff --git a/substrate/test-utils/runtime/transaction-pool/Cargo.toml b/substrate/test-utils/runtime/transaction-pool/Cargo.toml index 615886d987ffd8f7df5c5a880693b22fd9156355..72e81f5f19afa891e5e805b5057be41ac5cf05ee 100644 --- a/substrate/test-utils/runtime/transaction-pool/Cargo.toml +++ b/substrate/test-utils/runtime/transaction-pool/Cargo.toml @@ -9,6 +9,7 @@ license = "GPL-3.0" substrate-test-runtime-client = { version = "2.0.0", path = "../client" } parking_lot = "0.10.0" codec = { package = "parity-scale-codec", version = "1.0.0" } +sp-blockchain = { version = "2.0.0", path = "../../../primitives/blockchain" } sp-runtime = { version = "2.0.0", path = "../../../primitives/runtime" } sp-transaction-pool = { version = "2.0.0", path = "../../../primitives/transaction-pool" } sc-transaction-graph = { version = "2.0.0", path = "../../../client/transaction-pool/graph" } diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs index 58c801d8d66d7a4b77f0d6a34673a5fe153df350..aedc7dc4c37577cd32e345d8c37d30c36742559d 100644 --- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs +++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs @@ -50,7 +50,7 @@ impl std::error::Error for Error { } #[derive(Default)] -struct ChainState { +pub struct ChainState { pub block_by_number: HashMap<BlockNumber, Vec<Extrinsic>>, pub block_by_hash: HashMap<Hash, Vec<Extrinsic>>, pub header_by_number: HashMap<BlockNumber, Header>, @@ -96,16 +96,24 @@ impl TestApi { } /// Push block as a part of canonical chain under given number. - pub fn push_block(&self, block_number: BlockNumber, xts: Vec<Extrinsic>) { + pub fn push_block(&self, block_number: BlockNumber, xts: Vec<Extrinsic>) -> Header { let mut chain = self.chain.write(); - chain.block_by_number.insert(block_number, xts); - chain.header_by_number.insert(block_number, Header { + chain.block_by_number.insert(block_number, xts.clone()); + let header = Header { number: block_number, digest: Default::default(), extrinsics_root: Default::default(), - parent_hash: Default::default(), + parent_hash: block_number + .checked_sub(1) + .and_then(|num| { + chain.header_by_number.get(&num) + .cloned().map(|h| h.hash()) + }).unwrap_or_default(), state_root: Default::default(), - }); + }; + chain.block_by_hash.insert(header.hash(), xts); + chain.header_by_number.insert(block_number, header.clone()); + header } /// Push a block without a number. @@ -116,6 +124,20 @@ impl TestApi { chain.block_by_hash.insert(block_hash, xts); } + pub fn push_fork_block_with_parent(&self, parent: Hash, xts: Vec<Extrinsic>) -> Header { + let mut chain = self.chain.write(); + let blocknum = chain.block_by_number.keys().max().expect("block_by_number shouldn't be empty"); + let header = Header { + number: *blocknum, + digest: Default::default(), + extrinsics_root: Default::default(), + parent_hash: parent, + state_root: Default::default(), + }; + chain.block_by_hash.insert(header.hash(), xts); + header + } + fn hash_and_length_inner(ex: &Extrinsic) -> (Hash, usize) { let encoded = ex.encode(); (BlakeTwo256::hash(&encoded), encoded.len()) @@ -136,6 +158,11 @@ impl TestApi { self.validation_requests.read().clone() } + /// get a reference to the chain state + pub fn chain(&self) -> &RwLock<ChainState> { + &self.chain + } + /// Increment nonce in the inner state. pub fn increment_nonce(&self, account: AccountId) { let mut chain = self.chain.write(); @@ -197,7 +224,12 @@ impl sc_transaction_graph::ChainApi for TestApi { ) -> Result<Option<sc_transaction_graph::BlockHash<Self>>, Error> { Ok(match at { generic::BlockId::Hash(x) => Some(x.clone()), - _ => Some(Default::default()), + generic::BlockId::Number(num) => { + self.chain.read() + .header_by_number.get(num) + .map(|h| h.hash()) + .or_else(|| Some(Default::default())) + }, }) } @@ -209,10 +241,9 @@ impl sc_transaction_graph::ChainApi for TestApi { } fn block_body(&self, id: &BlockId<Self::Block>) -> Self::BodyFuture { - futures::future::ready(Ok(if let BlockId::Number(num) = id { - self.chain.read().block_by_number.get(num).cloned() - } else { - None + futures::future::ready(Ok(match id { + BlockId::Number(num) => self.chain.read().block_by_number.get(num).cloned(), + BlockId::Hash(hash) => self.chain.read().block_by_hash.get(hash).cloned(), })) } }