From db86094b03aaa9067116ab931e6922aa17365747 Mon Sep 17 00:00:00 2001
From: Nikolay Volf <nikvolf@gmail.com>
Date: Tue, 17 Mar 2020 08:24:04 -0700
Subject: [PATCH] Produce block always on updated transaction pool state
 (#5227)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* make sure return ready iterator once state is updated

* update sc_basic_authorship tests

* update node tests

* fix manual seal

* actually fix service test

* add tests

* Update client/basic-authorship/src/basic_authorship.rs

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* helper function

* review suggestions

* warning and continue

* add debug log

* use futures::chennel::oneshot

* use declaration bound

* no option for updated_at

* no allocation

* ready_at / ready

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

* Update client/transaction-pool/src/lib.rs

Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com>

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
---
 substrate/Cargo.lock                          |  2 +
 substrate/bin/node/cli/src/service.rs         | 31 ++++++-
 substrate/client/basic-authorship/Cargo.toml  |  1 +
 .../basic-authorship/src/basic_authorship.rs  | 71 ++++++++++++++--
 .../client/consensus/manual-seal/Cargo.toml   |  1 +
 .../client/consensus/manual-seal/src/lib.rs   | 16 +++-
 substrate/client/service/test/src/lib.rs      |  6 +-
 .../graph/src/validated_pool.rs               |  2 +-
 substrate/client/transaction-pool/src/lib.rs  | 83 +++++++++++++++++--
 .../transaction-pool/src/testing/pool.rs      | 52 ++++++++++++
 substrate/primitives/blockchain/src/error.rs  |  2 +
 .../primitives/transaction-pool/src/pool.rs   | 13 ++-
 12 files changed, 256 insertions(+), 24 deletions(-)

diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock
index f929a3dcbf7..6fc90c3c23e 100644
--- a/substrate/Cargo.lock
+++ b/substrate/Cargo.lock
@@ -5642,6 +5642,7 @@ name = "sc-basic-authorship"
 version = "0.8.0-alpha.3"
 dependencies = [
  "futures 0.3.4",
+ "futures-timer 3.0.2",
  "log 0.4.8",
  "parity-scale-codec",
  "parking_lot 0.10.0",
@@ -5970,6 +5971,7 @@ dependencies = [
 name = "sc-consensus-manual-seal"
 version = "0.8.0-alpha.3"
 dependencies = [
+ "assert_matches",
  "derive_more",
  "env_logger 0.7.1",
  "futures 0.3.4",
diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs
index 6d9989b274a..d81ec5f031f 100644
--- a/substrate/bin/node/cli/src/service.rs
+++ b/substrate/bin/node/cli/src/service.rs
@@ -397,6 +397,7 @@ mod tests {
 	use sc_service::AbstractService;
 	use crate::service::{new_full, new_light};
 	use sp_runtime::traits::IdentifyAccount;
+	use sp_transaction_pool::{MaintainedTransactionPool, ChainEvent};
 
 	type AccountPublic = <Signature as Verify>::Signer;
 
@@ -414,7 +415,21 @@ mod tests {
 		let dummy_runtime = ::tokio::runtime::Runtime::new().unwrap();
 		let block_factory = |service: &<Factory as service::ServiceFactory>::FullService| {
 			let block_id = BlockId::number(service.client().chain_info().best_number);
-			let parent_header = service.client().header(&block_id).unwrap().unwrap();
+			let parent_header = service.client().best_header(&block_id)
+				.expect("db error")
+				.expect("best block should exist");
+
+			futures::executor::block_on(
+				service.transaction_pool().maintain(
+					ChainEvent::NewBlock {
+						is_new_best: true,
+						id: block_id.clone(),
+						retracted: vec![],
+						header: parent_header,
+					},
+				)
+			);
+
 			let consensus_net = ConsensusNetwork::new(service.network(), service.client().clone());
 			let proposer_factory = consensus::ProposerFactory {
 				client: service.client().clone(),
@@ -464,6 +479,8 @@ mod tests {
 	}
 
 	#[test]
+	// It is "ignored", but the node-cli ignored tests are running on the CI.
+	// This can be run locally with `cargo test --release -p node-cli test_sync -- --ignored`.
 	#[ignore]
 	fn test_sync() {
 		let keystore_path = tempfile::tempdir().expect("Creates keystore path");
@@ -504,6 +521,18 @@ mod tests {
 				let parent_header = service.client().header(&parent_id).unwrap().unwrap();
 				let parent_hash = parent_header.hash();
 				let parent_number = *parent_header.number();
+
+				futures::executor::block_on(
+					service.transaction_pool().maintain(
+						ChainEvent::NewBlock {
+							is_new_best: true,
+							id: parent_id.clone(),
+							retracted: vec![],
+							header: parent_header.clone(),
+						},
+					)
+				);
+
 				let mut proposer_factory = sc_basic_authorship::ProposerFactory::new(
 					service.client(),
 					service.transaction_pool()
diff --git a/substrate/client/basic-authorship/Cargo.toml b/substrate/client/basic-authorship/Cargo.toml
index 7503221f655..ee2ff971bcc 100644
--- a/substrate/client/basic-authorship/Cargo.toml
+++ b/substrate/client/basic-authorship/Cargo.toml
@@ -23,6 +23,7 @@ sc-telemetry = { version = "2.0.0-alpha.2", path = "../telemetry" }
 sp-transaction-pool = { version = "2.0.0-alpha.2", path = "../../primitives/transaction-pool" }
 sc-block-builder = { version = "0.8.0-alpha.2", path = "../block-builder" }
 tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] }
+futures-timer = "3.0.1"
 
 [dev-dependencies]
 sc-transaction-pool = { version = "2.0.0-alpha.2", path = "../../client/transaction-pool" }
diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs
index fc9a5dec066..986d797ca73 100644
--- a/substrate/client/basic-authorship/src/basic_authorship.rs
+++ b/substrate/client/basic-authorship/src/basic_authorship.rs
@@ -33,7 +33,7 @@ use sp_transaction_pool::{TransactionPool, InPoolTransaction};
 use sc_telemetry::{telemetry, CONSENSUS_INFO};
 use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider};
 use sp_api::{ProvideRuntimeApi, ApiExt};
-use futures::prelude::*;
+use futures::{executor, future, future::Either};
 use sp_blockchain::{HeaderBackend, ApplyExtrinsicFailed};
 use std::marker::PhantomData;
 
@@ -210,7 +210,18 @@ impl<A, B, Block, C> ProposerInner<B, Block, C, A>
 		let mut is_first = true;
 		let mut skipped = 0;
 		let mut unqueue_invalid = Vec::new();
-		let pending_iterator = self.transaction_pool.ready();
+		let pending_iterator = match executor::block_on(future::select(
+			self.transaction_pool.ready_at(self.parent_number),
+			futures_timer::Delay::new((deadline - (self.now)()) / 8),
+		)) {
+			Either::Left((iterator, _)) => iterator,
+			Either::Right(_) => {
+				log::warn!(
+					"Timeout fired waiting for transaction pool to be ready. Proceeding to block production anyway.",
+				);
+				self.transaction_pool.ready()
+			}
+		};
 
 		debug!("Attempting to push transactions from the pool.");
 		debug!("Pool status: {:?}", self.transaction_pool.status());
@@ -304,10 +315,12 @@ mod tests {
 		prelude::*,
 		runtime::{Extrinsic, Transfer},
 	};
+	use sp_transaction_pool::{ChainEvent, MaintainedTransactionPool};
 	use sc_transaction_pool::{BasicPool, FullChainApi};
 	use sp_api::Core;
 	use backend::Backend;
 	use sp_blockchain::HeaderBackend;
+	use sp_runtime::traits::NumberFor;
 
 	fn extrinsic(nonce: u64) -> Extrinsic {
 		Transfer {
@@ -318,6 +331,17 @@ mod tests {
 		}.into_signed_tx()
 	}
 
+	fn chain_event<B: BlockT>(block_number: u64, header: B::Header) -> ChainEvent<B>
+		where NumberFor<B>: From<u64>
+	{
+		ChainEvent::NewBlock {
+			id: BlockId::Number(block_number.into()),
+			retracted: vec![],
+			is_new_best: true,
+			header: header,
+		}
+	}
+
 	#[test]
 	fn should_cease_building_block_when_deadline_is_reached() {
 		// given
@@ -330,16 +354,27 @@ mod tests {
 			txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)])
 		).unwrap();
 
+		futures::executor::block_on(
+			txpool.maintain(chain_event(
+				0,
+				client.header(&BlockId::Number(0u64)).expect("header get error").expect("there should be header")
+			))
+		);
+
 		let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone());
 
-		let cell = Mutex::new(time::Instant::now());
+		let cell = Mutex::new((false, time::Instant::now()));
 		let mut proposer = proposer_factory.init_with_now(
 			&client.header(&BlockId::number(0)).unwrap().unwrap(),
 			Box::new(move || {
 				let mut value = cell.lock();
-				let old = *value;
+				if !value.0 {
+					value.0 = true;
+					return value.1;
+				}
+				let old = value.1;
 				let new = old + time::Duration::from_secs(2);
-				*value = new;
+				*value = (true, new);
 				old
 			})
 		);
@@ -371,6 +406,13 @@ mod tests {
 			txpool.submit_at(&BlockId::number(0), vec![extrinsic(0)]),
 		).unwrap();
 
+		futures::executor::block_on(
+			txpool.maintain(chain_event(
+				0,
+				client.header(&BlockId::Number(0u64)).expect("header get error").expect("there should be header")
+			))
+		);
+
 		let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone());
 
 		let mut proposer = proposer_factory.init_with_now(
@@ -459,15 +501,26 @@ mod tests {
 			block
 		};
 
+		futures::executor::block_on(
+			txpool.maintain(chain_event(
+				0,
+				client.header(&BlockId::Number(0u64)).expect("header get error").expect("there should be header")
+			))
+		);
+
 		// let's create one block and import it
 		let block = propose_block(&client, 0, 2, 7);
 		client.import(BlockOrigin::Own, block).unwrap();
 
-		// now let's make sure that we can still make some progress
+		futures::executor::block_on(
+			txpool.maintain(chain_event(
+				1,
+				client.header(&BlockId::Number(1)).expect("header get error").expect("there should be header")
+			))
+		);
 
-		// This is most likely incorrect, and caused by #5139
-		let tx_remaining = 0;
-		let block = propose_block(&client, 1, 2, tx_remaining);
+		// now let's make sure that we can still make some progress
+		let block = propose_block(&client, 1, 2, 5);
 		client.import(BlockOrigin::Own, block).unwrap();
 	}
 }
diff --git a/substrate/client/consensus/manual-seal/Cargo.toml b/substrate/client/consensus/manual-seal/Cargo.toml
index 11fee0e3f9e..f0925df0893 100644
--- a/substrate/client/consensus/manual-seal/Cargo.toml
+++ b/substrate/client/consensus/manual-seal/Cargo.toml
@@ -17,6 +17,7 @@ jsonrpc-derive = "14.0.5"
 log = "0.4.8"
 parking_lot = "0.10.0"
 serde = { version = "1.0", features=["derive"] }
+assert_matches = "1.3.0"
 
 sc-client = { path = "../../../client" , version = "0.8.0-alpha.2"}
 sc-client-api = { path = "../../../client/api" , version = "2.0.0-alpha.2"}
diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs
index 18dc91ad34d..36d12a00467 100644
--- a/substrate/client/consensus/manual-seal/src/lib.rs
+++ b/substrate/client/consensus/manual-seal/src/lib.rs
@@ -224,7 +224,7 @@ mod tests {
 		txpool::Options,
 	};
 	use substrate_test_runtime_transaction_pool::{TestApi, uxt};
-	use sp_transaction_pool::TransactionPool;
+	use sp_transaction_pool::{TransactionPool, MaintainedTransactionPool};
 	use sp_runtime::generic::BlockId;
 	use sp_blockchain::HeaderBackend;
 	use sp_consensus::ImportedAux;
@@ -432,14 +432,24 @@ mod tests {
 		assert!(backend.blockchain().header(BlockId::Number(0)).unwrap().is_some());
 		assert!(pool.submit_one(&BlockId::Number(1), uxt(Alice, 1)).await.is_ok());
 
+		pool.maintain(sp_transaction_pool::ChainEvent::NewBlock {
+			id: BlockId::Number(1),
+			header: backend.blockchain().header(BlockId::Number(1)).expect("db error").expect("imported above"),
+			is_new_best: true,
+			retracted: vec![],
+		}).await;
+
 		let (tx1, rx1) = futures::channel::oneshot::channel();
 		assert!(sink.send(EngineCommand::SealNewBlock {
-			parent_hash: Some(created_block.hash.clone()),
+			parent_hash: Some(created_block.hash),
 			sender: Some(tx1),
 			create_empty: false,
 			finalize: false,
 		}).await.is_ok());
-		assert!(rx1.await.unwrap().is_ok());
+		assert_matches::assert_matches!(
+			rx1.await.expect("should be no error receiving"),
+			Ok(_)
+		);
 		assert!(backend.blockchain().header(BlockId::Number(1)).unwrap().is_some());
 		pool_api.increment_nonce(Alice.into());
 
diff --git a/substrate/client/service/test/src/lib.rs b/substrate/client/service/test/src/lib.rs
index 259c25ee4d4..0a29bc7bf29 100644
--- a/substrate/client/service/test/src/lib.rs
+++ b/substrate/client/service/test/src/lib.rs
@@ -482,7 +482,11 @@ pub fn sync<G, E, Fb, F, Lb, L, B, ExF, U>(
 	let first_user_data = &network.full_nodes[0].2;
 	let best_block = BlockId::number(first_service.get().client().chain_info().best_number);
 	let extrinsic = extrinsic_factory(&first_service.get(), first_user_data);
-	futures::executor::block_on(first_service.get().transaction_pool().submit_one(&best_block, extrinsic)).unwrap();
+
+	futures::executor::block_on(
+		first_service.get().transaction_pool().submit_one(&best_block, extrinsic)
+	).expect("failed to submit extrinsic");
+
 	network.run_until_all_full(
 		|_index, service| service.get().transaction_pool().ready().count() == 1,
 		|_index, _service| true,
diff --git a/substrate/client/transaction-pool/graph/src/validated_pool.rs b/substrate/client/transaction-pool/graph/src/validated_pool.rs
index a62822a9185..3a6573d9bd6 100644
--- a/substrate/client/transaction-pool/graph/src/validated_pool.rs
+++ b/substrate/client/transaction-pool/graph/src/validated_pool.rs
@@ -545,7 +545,7 @@ impl<B: ChainApi> ValidatedPool<B> {
 	}
 
 	/// Get an iterator for ready transactions ordered by priority
-	pub fn ready(&self) -> impl Iterator<Item=TransactionFor<B>> {
+	pub fn ready(&self) -> impl Iterator<Item=TransactionFor<B>> + Send {
 		self.pool.read().ready()
 	}
 
diff --git a/substrate/client/transaction-pool/src/lib.rs b/substrate/client/transaction-pool/src/lib.rs
index 7ee73b862ad..3c240611f0b 100644
--- a/substrate/client/transaction-pool/src/lib.rs
+++ b/substrate/client/transaction-pool/src/lib.rs
@@ -31,12 +31,12 @@ pub use sc_transaction_graph as txpool;
 pub use crate::api::{FullChainApi, LightChainApi};
 
 use std::{collections::HashMap, sync::Arc, pin::Pin};
-use futures::{Future, FutureExt, future::ready};
+use futures::{Future, FutureExt, future::ready, channel::oneshot};
 use parking_lot::Mutex;
 
 use sp_runtime::{
 	generic::BlockId,
-	traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic},
+	traits::{Block as BlockT, NumberFor, AtLeast32Bit, Extrinsic, Zero},
 };
 use sp_transaction_pool::{
 	TransactionPool, PoolStatus, ImportNotificationStream, TxHash, TransactionFor,
@@ -44,6 +44,12 @@ use sp_transaction_pool::{
 };
 use wasm_timer::Instant;
 
+type BoxedReadyIterator<Hash, Data> = Box<dyn Iterator<Item=Arc<sc_transaction_graph::base_pool::Transaction<Hash, Data>>> + Send>;
+
+type ReadyIteratorFor<PoolApi> = BoxedReadyIterator<sc_transaction_graph::ExHash<PoolApi>, sc_transaction_graph::ExtrinsicFor<PoolApi>>;
+
+type PolledIterator<PoolApi> = Pin<Box<dyn Future<Output=ReadyIteratorFor<PoolApi>> + Send>>;
+
 /// Basic implementation of transaction pool that can be customized by providing PoolApi.
 pub struct BasicPool<PoolApi, Block>
 	where
@@ -54,6 +60,48 @@ pub struct BasicPool<PoolApi, Block>
 	api: Arc<PoolApi>,
 	revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
 	revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
+	ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
+}
+
+struct ReadyPoll<T, Block: BlockT> {
+	updated_at: NumberFor<Block>,
+	pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
+}
+
+impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
+	fn default() -> Self {
+		Self {
+			updated_at: NumberFor::<Block>::zero(),
+			pollers: Default::default(),
+		}
+	}
+}
+
+impl<T, Block: BlockT> ReadyPoll<T, Block> {
+	fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
+		self.updated_at = number;
+
+		let mut idx = 0;
+		while idx < self.pollers.len() {
+			if self.pollers[idx].0 <= number {
+				let poller_sender = self.pollers.swap_remove(idx);
+				log::debug!(target: "txpool", "Sending ready signal at block {}", number);
+				let _ = poller_sender.1.send(iterator_factory());
+			} else {
+				idx += 1;
+			}
+		}
+	}
+
+	fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
+		let (sender, receiver) = oneshot::channel();
+		self.pollers.push((number, sender));
+		receiver
+	}
+
+	fn updated_at(&self) -> NumberFor<Block> {
+		self.updated_at
+	}
 }
 
 #[cfg(not(target_os = "unknown"))]
@@ -128,6 +176,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
 						RevalidationType::Full => RevalidationStrategy::Always,
 					}
 				)),
+				ready_poll: Default::default(),
 			},
 			background_task,
 		)
@@ -196,10 +245,6 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
 		self.pool.validated_pool().status()
 	}
 
-	fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>> {
-		Box::new(self.pool.validated_pool().ready())
-	}
-
 	fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
 		self.pool.validated_pool().import_notification_stream()
 	}
@@ -215,6 +260,27 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
 	fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
 		self.pool.validated_pool().ready_by_hash(hash)
 	}
+
+	fn ready_at(&self, at: NumberFor<Self::Block>) -> PolledIterator<PoolApi> {
+		if self.ready_poll.lock().updated_at() >= at {
+			let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
+			return Box::pin(futures::future::ready(iterator));
+		}
+
+		Box::pin(
+			self.ready_poll
+				.lock()
+				.add(at)
+				.map(|received| received.unwrap_or_else(|e| {
+					log::warn!("Error receiving pending set: {:?}", e);
+					Box::new(vec![].into_iter())
+				}))
+		)
+	}
+
+	fn ready(&self) -> ReadyIteratorFor<PoolApi> {
+		Box::new(self.pool.validated_pool().ready())
+	}
 }
 
 #[cfg_attr(test, derive(Debug))]
@@ -329,6 +395,7 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
 				let revalidation_strategy = self.revalidation_strategy.clone();
 				let retracted = retracted.clone();
 				let revalidation_queue = self.revalidation_queue.clone();
+				let ready_poll = self.ready_poll.clone();
 
 				async move {
 					// We don't query block if we won't prune anything
@@ -348,6 +415,10 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
 						}
 					}
 
+					let extra_pool = pool.clone();
+					// After #5200 lands, this arguably might be moved to the handler of "all blocks notification".
+					ready_poll.lock().trigger(block_number, move || Box::new(extra_pool.validated_pool().ready()));
+
 					if next_action.resubmit {
 						let mut resubmit_transactions = Vec::new();
 
diff --git a/substrate/client/transaction-pool/src/testing/pool.rs b/substrate/client/transaction-pool/src/testing/pool.rs
index 7494ba2684f..27cb6c62a53 100644
--- a/substrate/client/transaction-pool/src/testing/pool.rs
+++ b/substrate/client/transaction-pool/src/testing/pool.rs
@@ -28,6 +28,7 @@ use substrate_test_runtime_client::{
 };
 use substrate_test_runtime_transaction_pool::{TestApi, uxt};
 use crate::revalidation::BACKGROUND_REVALIDATION_INTERVAL;
+use futures::task::Poll;
 
 fn pool() -> Pool<TestApi> {
 	Pool::new(Default::default(), TestApi::with_alice_nonce(209).into())
@@ -600,5 +601,56 @@ fn fork_aware_finalization() {
 		assert_eq!(stream.next(), Some(TransactionStatus::Finalized(e1.clone())));
 		assert_eq!(stream.next(), None);
 	}
+}
+
+#[test]
+fn ready_set_should_not_resolve_before_block_update() {
+	let (pool, _guard) = maintained_pool();
+	let xt1 = uxt(Alice, 209);
+	block_on(pool.submit_one(&BlockId::number(1), xt1.clone())).expect("1. Imported");
+
+	assert!(pool.ready_at(1).now_or_never().is_none());
+}
+
+#[test]
+fn ready_set_should_resolve_after_block_update() {
+	let (pool, _guard) = maintained_pool();
+	pool.api.push_block(1, vec![]);
+
+	let xt1 = uxt(Alice, 209);
+
+	block_on(pool.submit_one(&BlockId::number(1), xt1.clone())).expect("1. Imported");
+	block_on(pool.maintain(block_event(1)));
 
+	assert!(pool.ready_at(1).now_or_never().is_some());
 }
+
+#[test]
+fn ready_set_should_eventually_resolve_when_block_update_arrives() {
+	let (pool, _guard) = maintained_pool();
+	pool.api.push_block(1, vec![]);
+
+	let xt1 = uxt(Alice, 209);
+
+	block_on(pool.submit_one(&BlockId::number(1), xt1.clone())).expect("1. Imported");
+
+	let noop_waker = futures::task::noop_waker();
+	let mut context = futures::task::Context::from_waker(&noop_waker);
+
+	let mut ready_set_future = pool.ready_at(1);
+	if let Poll::Ready(_) = ready_set_future.poll_unpin(&mut context) {
+		panic!("Ready set should not be ready before block update!");
+	}
+
+	block_on(pool.maintain(block_event(1)));
+
+	match ready_set_future.poll_unpin(&mut context)  {
+		Poll::Pending => {
+			panic!("Ready set should become ready after block update!");
+		},
+		Poll::Ready(iterator) => {
+			let data = iterator.collect::<Vec<_>>();
+			assert_eq!(data.len(), 1);
+		}
+	}
+}
\ No newline at end of file
diff --git a/substrate/primitives/blockchain/src/error.rs b/substrate/primitives/blockchain/src/error.rs
index a4ec9c29952..e479b8abe91 100644
--- a/substrate/primitives/blockchain/src/error.rs
+++ b/substrate/primitives/blockchain/src/error.rs
@@ -127,6 +127,8 @@ pub enum Error {
 	/// Incomplete block import pipeline.
 	#[display(fmt = "Incomplete block import pipeline.")]
 	IncompletePipeline,
+	#[display(fmt = "Transaction pool not ready for block production.")]
+	TransactionPoolNotReady,
 	/// A convenience variant for String
 	#[display(fmt = "{}", _0)]
 	Msg(String),
diff --git a/substrate/primitives/transaction-pool/src/pool.rs b/substrate/primitives/transaction-pool/src/pool.rs
index 89f327a523b..cad06679647 100644
--- a/substrate/primitives/transaction-pool/src/pool.rs
+++ b/substrate/primitives/transaction-pool/src/pool.rs
@@ -29,7 +29,7 @@ use futures::{
 use serde::{Deserialize, Serialize};
 use sp_runtime::{
 	generic::BlockId,
-	traits::{Block as BlockT, Member},
+	traits::{Block as BlockT, Member, NumberFor},
 	transaction_validity::{
 		TransactionLongevity, TransactionPriority, TransactionTag,
 	},
@@ -210,8 +210,15 @@ pub trait TransactionPool: Send + Sync {
 	) -> PoolFuture<Box<TransactionStatusStreamFor<Self>>, Self::Error>;
 
 	// *** Block production / Networking
-	/// Get an iterator for ready transactions ordered by priority
-	fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>>>;
+	/// Get an iterator for ready transactions ordered by priority.
+	///
+	/// Guarantees to return only when transaction pool got updated at `at` block.
+	/// Guarantees to return immediately when `None` is passed.
+	fn ready_at(&self, at: NumberFor<Self::Block>)
+		-> Pin<Box<dyn Future<Output=Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>> + Send>> + Send>>;
+
+	/// Get an iterator for ready transactions ordered by priority.
+	fn ready(&self) -> Box<dyn Iterator<Item=Arc<Self::InPoolTransaction>> + Send>;
 
 	// *** Block production
 	/// Remove transactions identified by given hashes (and dependent transactions) from the pool.
-- 
GitLab