diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs index 45628e90a6f9c7b761b018d5fcbcee9d0966655b..2473ac848ca3227ecf89f3c15741063bb35f32ac 100644 --- a/substrate/client/consensus/manual-seal/src/lib.rs +++ b/substrate/client/consensus/manual-seal/src/lib.rs @@ -296,7 +296,13 @@ mod tests { let client = Arc::new(client); let spawner = sp_core::testing::TaskExecutor::new(); let pool = Arc::new(BasicPool::with_revalidation_type( - Options::default(), true.into(), api(), None, RevalidationType::Full, spawner.clone(), + Options::default(), + true.into(), + api(), + None, + RevalidationType::Full, + spawner.clone(), + 0, )); let env = ProposerFactory::new( spawner.clone(), @@ -373,6 +379,7 @@ mod tests { None, RevalidationType::Full, spawner.clone(), + 0, )); let env = ProposerFactory::new( spawner.clone(), @@ -453,6 +460,7 @@ mod tests { None, RevalidationType::Full, spawner.clone(), + 0, )); let env = ProposerFactory::new( spawner.clone(), diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs index bc5f6e367ff86c8feeee241ffe316282e60507d1..32bea107d8acc6102abc8012480dab9995fda4fa 100644 --- a/substrate/client/transaction-pool/src/lib.rs +++ b/substrate/client/transaction-pool/src/lib.rs @@ -98,6 +98,13 @@ impl<T, Block: BlockT> Default for ReadyPoll<T, Block> { } impl<T, Block: BlockT> ReadyPoll<T, Block> { + fn new(best_block_number: NumberFor<Block>) -> Self { + Self { + updated_at: best_block_number, + pollers: Default::default(), + } + } + fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) { self.updated_at = number; @@ -189,6 +196,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block> prometheus: Option<&PrometheusRegistry>, revalidation_type: RevalidationType, spawner: impl SpawnNamed, + best_block_number: NumberFor<Block>, ) -> Self { let pool = Arc::new(sc_transaction_graph::Pool::new(options, is_validator, pool_api.clone())); let (revalidation_queue, background_task) = match revalidation_type { @@ -213,7 +221,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block> RevalidationType::Full => RevalidationStrategy::Always, } )), - ready_poll: Default::default(), + ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))), metrics: PrometheusMetrics::new(prometheus), } } @@ -309,21 +317,29 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block> } fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> { + let status = self.status(); + // If there are no transactions in the pool, it is fine to return early. + // + // There could be transaction being added because of some re-org happening at the relevant + // block, but this is relative unlikely. + if status.ready == 0 && status.future == 0 { + return async { Box::new(std::iter::empty()) as Box<_> }.boxed() + } + if self.ready_poll.lock().updated_at() >= at { log::trace!(target: "txpool", "Transaction pool already processed block #{}", at); let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready()); - return Box::pin(futures::future::ready(iterator)); + return async move { iterator }.boxed(); } - Box::pin( - self.ready_poll - .lock() - .add(at) - .map(|received| received.unwrap_or_else(|e| { - log::warn!("Error receiving pending set: {:?}", e); - Box::new(vec![].into_iter()) - })) - ) + self.ready_poll + .lock() + .add(at) + .map(|received| received.unwrap_or_else(|e| { + log::warn!("Error receiving pending set: {:?}", e); + Box::new(std::iter::empty()) + })) + .boxed() } fn ready(&self) -> ReadyIteratorFor<PoolApi> { @@ -334,7 +350,7 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block> impl<Block, Client, Fetcher> LightPool<Block, Client, Fetcher> where Block: BlockT, - Client: sp_blockchain::HeaderBackend<Block> + 'static, + Client: sp_blockchain::HeaderBackend<Block> + sc_client_api::UsageProvider<Block> + 'static, Fetcher: sc_client_api::Fetcher<Block> + 'static, { /// Create new basic transaction pool for a light node with the provided api. @@ -345,9 +361,15 @@ where client: Arc<Client>, fetcher: Arc<Fetcher>, ) -> Self { - let pool_api = Arc::new(LightChainApi::new(client, fetcher)); + let pool_api = Arc::new(LightChainApi::new(client.clone(), fetcher)); Self::with_revalidation_type( - options, false.into(), pool_api, prometheus, RevalidationType::Light, spawner, + options, + false.into(), + pool_api, + prometheus, + RevalidationType::Light, + spawner, + client.usage_info().chain.best_number, ) } } @@ -357,8 +379,12 @@ where Block: BlockT, Client: sp_api::ProvideRuntimeApi<Block> + sc_client_api::BlockBackend<Block> - + sp_runtime::traits::BlockIdTo<Block>, - Client: sc_client_api::ExecutorProvider<Block> + Send + Sync + 'static, + + sp_runtime::traits::BlockIdTo<Block> + + sc_client_api::ExecutorProvider<Block> + + sc_client_api::UsageProvider<Block> + + Send + + Sync + + 'static, Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>, { /// Create new basic transaction pool for a full node with the provided api. @@ -371,7 +397,13 @@ where ) -> Arc<Self> { let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus)); let pool = Arc::new(Self::with_revalidation_type( - options, is_validator, pool_api, prometheus, RevalidationType::Full, spawner + options, + is_validator, + pool_api, + prometheus, + RevalidationType::Full, + spawner, + client.usage_info().chain.best_number, )); // make transaction pool available for off-chain runtime calls.