From 8ae1aa4c289213f7f6c33111e9049874898f5b59 Mon Sep 17 00:00:00 2001 From: Wei Tang <wei@that.world> Date: Fri, 17 Jul 2020 12:31:47 +0200 Subject: [PATCH] Switch to bounded mpsc for txpool import notification stream (#6640) * Switch to bounded mpsc for txpool import notification stream * Update client/transaction-pool/graph/src/validated_pool.rs Co-authored-by: Nikolay Volf <nikvolf@gmail.com> Co-authored-by: Nikolay Volf <nikvolf@gmail.com> --- substrate/Cargo.lock | 8 ++++++- .../client/transaction-pool/graph/Cargo.toml | 1 + .../client/transaction-pool/graph/src/pool.rs | 4 ++-- .../graph/src/validated_pool.rs | 23 +++++++++++++++---- .../primitives/transaction-pool/Cargo.toml | 2 -- .../primitives/transaction-pool/src/pool.rs | 3 +-- 6 files changed, 30 insertions(+), 11 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 6cdc764d9d0..6a148091bb6 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -5832,6 +5832,12 @@ dependencies = [ "syn 1.0.33", ] +[[package]] +name = "retain_mut" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e005d658ad26eacc2b6c506dfde519f4e277e328d0eb3379ca61647d70a8f531" + [[package]] name = "ring" version = "0.16.12" @@ -7044,6 +7050,7 @@ dependencies = [ "parity-scale-codec", "parity-util-mem 0.7.0", "parking_lot 0.10.2", + "retain_mut", "serde", "sp-blockchain", "sp-core", @@ -8114,7 +8121,6 @@ dependencies = [ "sp-api", "sp-blockchain", "sp-runtime", - "sp-utils", ] [[package]] diff --git a/substrate/client/transaction-pool/graph/Cargo.toml b/substrate/client/transaction-pool/graph/Cargo.toml index ecce54505dc..d90d13ac196 100644 --- a/substrate/client/transaction-pool/graph/Cargo.toml +++ b/substrate/client/transaction-pool/graph/Cargo.toml @@ -25,6 +25,7 @@ sp-runtime = { version = "2.0.0-rc4", path = "../../../primitives/runtime" } sp-transaction-pool = { version = "2.0.0-rc4", path = "../../../primitives/transaction-pool" } parity-util-mem = { version = "0.7.0", default-features = false, features = ["primitive-types"] } linked-hash-map = "0.5.2" +retain_mut = "0.1.1" [dev-dependencies] assert_matches = "1.3.0" diff --git a/substrate/client/transaction-pool/graph/src/pool.rs b/substrate/client/transaction-pool/graph/src/pool.rs index 750d5f5d10e..56ff550d775 100644 --- a/substrate/client/transaction-pool/graph/src/pool.rs +++ b/substrate/client/transaction-pool/graph/src/pool.rs @@ -33,13 +33,13 @@ use sp_runtime::{ }; use sp_transaction_pool::error; use wasm_timer::Instant; -use sp_utils::mpsc::TracingUnboundedReceiver; +use futures::channel::mpsc::Receiver; use crate::validated_pool::ValidatedPool; pub use crate::validated_pool::ValidatedTransaction; /// Modification notification event stream type; -pub type EventStream<H> = TracingUnboundedReceiver<H>; +pub type EventStream<H> = Receiver<H>; /// Block hash type for a pool. pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash; diff --git a/substrate/client/transaction-pool/graph/src/validated_pool.rs b/substrate/client/transaction-pool/graph/src/validated_pool.rs index bde76196ec4..86c2e75832f 100644 --- a/substrate/client/transaction-pool/graph/src/validated_pool.rs +++ b/substrate/client/transaction-pool/graph/src/validated_pool.rs @@ -36,7 +36,8 @@ use sp_runtime::{ }; use sp_transaction_pool::{error, PoolStatus}; use wasm_timer::Instant; -use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; +use futures::channel::mpsc::{channel, Sender}; +use retain_mut::RetainMut; use crate::base_pool::PruneStatus; use crate::pool::{ @@ -98,7 +99,7 @@ pub struct ValidatedPool<B: ChainApi> { ExtrinsicHash<B>, ExtrinsicFor<B>, >>, - import_notification_sinks: Mutex<Vec<TracingUnboundedSender<ExtrinsicHash<B>>>>, + import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>, rotator: PoolRotator<ExtrinsicHash<B>>, } @@ -186,7 +187,19 @@ impl<B: ChainApi> ValidatedPool<B> { if let base::Imported::Ready { ref hash, .. } = imported { self.import_notification_sinks.lock() - .retain(|sink| sink.unbounded_send(hash.clone()).is_ok()); + .retain_mut(|sink| { + match sink.try_send(hash.clone()) { + Ok(()) => true, + Err(e) => { + if e.is_full() { + log::warn!(target: "txpool", "[{:?}] Trying to notify an import but the channel is full", hash); + true + } else { + false + } + }, + } + }); } let mut listener = self.listener.write(); @@ -529,7 +542,9 @@ impl<B: ChainApi> ValidatedPool<B> { /// Consumers of this stream should use the `ready` method to actually get the /// pending transactions in the right order. pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> { - let (sink, stream) = tracing_unbounded("mpsc_import_notifications"); + const CHANNEL_BUFFER_SIZE: usize = 1024; + + let (sink, stream) = channel(CHANNEL_BUFFER_SIZE); self.import_notification_sinks.lock().push(sink); stream } diff --git a/substrate/primitives/transaction-pool/Cargo.toml b/substrate/primitives/transaction-pool/Cargo.toml index a217bdef4a9..c82592e9bef 100644 --- a/substrate/primitives/transaction-pool/Cargo.toml +++ b/substrate/primitives/transaction-pool/Cargo.toml @@ -12,7 +12,6 @@ documentation = "https://docs.rs/sp-transaction-pool" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] - [dependencies] codec = { package = "parity-scale-codec", version = "1.3.1", optional = true } derive_more = { version = "0.99.2", optional = true } @@ -22,7 +21,6 @@ serde = { version = "1.0.101", features = ["derive"], optional = true} sp-api = { version = "2.0.0-rc4", default-features = false, path = "../api" } sp-blockchain = { version = "2.0.0-rc4", optional = true, path = "../blockchain" } sp-runtime = { version = "2.0.0-rc4", default-features = false, path = "../runtime" } -sp-utils = { version = "2.0.0-rc4", default-features = false, path = "../utils" } [features] default = [ "std" ] diff --git a/substrate/primitives/transaction-pool/src/pool.rs b/substrate/primitives/transaction-pool/src/pool.rs index 848c6f9e178..7d1d5537dc9 100644 --- a/substrate/primitives/transaction-pool/src/pool.rs +++ b/substrate/primitives/transaction-pool/src/pool.rs @@ -25,7 +25,6 @@ use std::{ }; use futures::{Future, Stream}; use serde::{Deserialize, Serialize}; -use sp_utils::mpsc; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Member, NumberFor}, @@ -131,7 +130,7 @@ pub enum TransactionStatus<Hash, BlockHash> { pub type TransactionStatusStream<Hash, BlockHash> = dyn Stream<Item=TransactionStatus<Hash, BlockHash>> + Send + Unpin; /// The import notification event stream. -pub type ImportNotificationStream<H> = mpsc::TracingUnboundedReceiver<H>; +pub type ImportNotificationStream<H> = futures::channel::mpsc::Receiver<H>; /// Transaction hash type for a pool. pub type TxHash<P> = <P as TransactionPool>::Hash; -- GitLab