From a24e61cb294e78323dd647dd39ba35698d12101e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= <tomusdrw@users.noreply.github.com>
Date: Tue, 16 Oct 2018 10:04:19 +0200
Subject: [PATCH] Longevity handling. (#903)

---
 substrate/core/service/src/lib.rs             |   2 +-
 .../transaction-pool/graph/src/base_pool.rs   | 141 +++++++++---------
 .../core/transaction-pool/graph/src/future.rs |   5 +
 .../core/transaction-pool/graph/src/lib.rs    |   1 -
 .../core/transaction-pool/graph/src/pool.rs   |  58 ++++---
 .../core/transaction-pool/graph/src/ready.rs  |  53 +++----
 .../transaction-pool/graph/src/rotator.rs     |  47 +++---
 substrate/core/transaction-pool/src/tests.rs  |  16 +-
 substrate/node/consensus/src/lib.rs           |   4 +-
 9 files changed, 152 insertions(+), 175 deletions(-)

diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs
index 0528116694b..ba506d31d57 100644
--- a/substrate/core/service/src/lib.rs
+++ b/substrate/core/service/src/lib.rs
@@ -395,7 +395,7 @@ impl<C: Components> network::TransactionPool<ComponentExHash<C>, ComponentBlock<
 		self.pool.ready(|pending| pending
 			.map(|t| {
 				let hash = t.hash.clone();
-				let ex: ComponentExtrinsic<C> = t.data.raw.clone();
+				let ex: ComponentExtrinsic<C> = t.data.clone();
 				(hash, ex)
 			})
 			.collect())
diff --git a/substrate/core/transaction-pool/graph/src/base_pool.rs b/substrate/core/transaction-pool/graph/src/base_pool.rs
index a416efa531d..9dba89fbec4 100644
--- a/substrate/core/transaction-pool/graph/src/base_pool.rs
+++ b/substrate/core/transaction-pool/graph/src/base_pool.rs
@@ -34,9 +34,6 @@ use error;
 use future::{FutureTransactions, WaitingTransaction};
 use ready::ReadyTransactions;
 
-/// Block number type.
-pub type BlockNumber = u64;
-
 /// Successful import result.
 #[derive(Debug, PartialEq, Eq)]
 pub enum Imported<Hash, Ex> {
@@ -90,8 +87,8 @@ pub struct Transaction<Hash, Extrinsic> {
 	pub hash: Hash,
 	/// Transaction priority (higher = better)
 	pub priority: Priority,
-	/// How many blocks the transaction is valid for.
-	pub longevity: Longevity,
+	/// At which block the transaction becomes invalid?
+	pub valid_till: Longevity,
 	/// Tags required by the transaction.
 	pub requires: Vec<Tag>,
 	/// Tags that this transaction provides.
@@ -133,7 +130,6 @@ impl<Hash: hash::Hash + Member, Ex: ::std::fmt::Debug> BasePool<Hash, Ex> {
 	/// ready to be included in the block.
 	pub fn import(
 		&mut self,
-		block_number: BlockNumber,
 		tx: Transaction<Hash, Ex>,
 	) -> error::Result<Imported<Hash, Ex>> {
 		if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) {
@@ -151,13 +147,13 @@ impl<Hash: hash::Hash + Member, Ex: ::std::fmt::Debug> BasePool<Hash, Ex> {
 			return Ok(Imported::Future { hash });
 		}
 
-		self.import_to_ready(block_number, tx)
+		self.import_to_ready(tx)
 	}
 
 	/// Imports transaction to ready queue.
 	///
 	/// NOTE the transaction has to have all requirements satisfied.
-	fn import_to_ready(&mut self, block_number: BlockNumber, tx: WaitingTransaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
+	fn import_to_ready(&mut self, tx: WaitingTransaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
 		let hash = tx.transaction.hash.clone();
 		let mut promoted = vec![];
 		let mut failed = vec![];
@@ -178,7 +174,7 @@ impl<Hash: hash::Hash + Member, Ex: ::std::fmt::Debug> BasePool<Hash, Ex> {
 
 			// import this transaction
 			let current_hash = tx.transaction.hash.clone();
-			match self.ready.import(block_number, tx) {
+			match self.ready.import(tx) {
 				Ok(mut replaced) => {
 					if !first {
 						promoted.push(current_hash);
@@ -224,6 +220,11 @@ impl<Hash: hash::Hash + Member, Ex: ::std::fmt::Debug> BasePool<Hash, Ex> {
 		self.ready.get()
 	}
 
+	/// Returns an iterator over future transactions in the pool.
+	pub fn futures(&self) -> impl Iterator<Item=&Transaction<Hash, Ex>> {
+		self.future.all()
+	}
+
 	/// Removes all transactions represented by the hashes and all other transactions
 	/// that depend on them.
 	///
@@ -244,7 +245,7 @@ impl<Hash: hash::Hash + Member, Ex: ::std::fmt::Debug> BasePool<Hash, Ex> {
 	/// but unlike `remove_invalid`, dependent transactions are not touched.
 	/// Additional transactions from future queue might be promoted to ready if you satisfy tags
 	/// that the pool didn't previously know about.
-	pub fn prune_tags(&mut self, block_number: BlockNumber, tags: impl IntoIterator<Item=Tag>) -> PruneStatus<Hash, Ex> {
+	pub fn prune_tags(&mut self, tags: impl IntoIterator<Item=Tag>) -> PruneStatus<Hash, Ex> {
 		let mut to_import = vec![];
 		let mut pruned = vec![];
 
@@ -259,7 +260,7 @@ impl<Hash: hash::Hash + Member, Ex: ::std::fmt::Debug> BasePool<Hash, Ex> {
 		let mut failed = vec![];
 		for tx in to_import {
 			let hash = tx.transaction.hash.clone();
-			match self.import_to_ready(block_number, tx) {
+			match self.import_to_ready(tx) {
 				Ok(res) => promoted.push(res),
 				Err(e) => {
 					warn!(target: "txpool", "[{:?}] Failed to promote during pruning: {:?}", hash, e);
@@ -308,11 +309,11 @@ mod tests {
 		let mut pool = pool();
 
 		// when
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![1u8],
 			hash: 1u64,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![],
 			provides: vec![vec![1]],
 		}).unwrap();
@@ -328,19 +329,19 @@ mod tests {
 		let mut pool = pool();
 
 		// when
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![1u8],
 			hash: 1,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![],
 			provides: vec![vec![1]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![1u8],
 			hash: 1,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![],
 			provides: vec![vec![1]],
 		}).unwrap_err();
@@ -357,21 +358,21 @@ mod tests {
 		let mut pool = pool();
 
 		// when
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![1u8],
 			hash: 1,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![0]],
 			provides: vec![vec![1]],
 		}).unwrap();
 		assert_eq!(pool.ready().count(), 0);
 		assert_eq!(pool.ready.len(), 0);
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![2u8],
 			hash: 2,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![],
 			provides: vec![vec![0]],
 		}).unwrap();
@@ -387,46 +388,46 @@ mod tests {
 		let mut pool = pool();
 
 		// when
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![1u8],
 			hash: 1,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![0]],
 			provides: vec![vec![1]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![3u8],
 			hash: 3,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![2]],
 			provides: vec![],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![2u8],
 			hash: 2,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![1]],
 			provides: vec![vec![3], vec![2]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![4u8],
 			hash: 4,
 			priority: 1_000u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![3], vec![4]],
 			provides: vec![],
 		}).unwrap();
 		assert_eq!(pool.ready().count(), 0);
 		assert_eq!(pool.ready.len(), 0);
 
-		let res = pool.import(1, Transaction {
+		let res = pool.import(Transaction {
 			data: vec![5u8],
 			hash: 5,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![],
 			provides: vec![vec![0], vec![4]],
 		}).unwrap();
@@ -452,19 +453,19 @@ mod tests {
 	fn should_handle_a_cycle() {
 		// given
 		let mut pool = pool();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![1u8],
 			hash: 1,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![0]],
 			provides: vec![vec![1]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![3u8],
 			hash: 3,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![1]],
 			provides: vec![vec![2]],
 		}).unwrap();
@@ -472,11 +473,11 @@ mod tests {
 		assert_eq!(pool.ready.len(), 0);
 
 		// when
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![2u8],
 			hash: 2,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![2]],
 			provides: vec![vec![0]],
 		}).unwrap();
@@ -490,11 +491,11 @@ mod tests {
 		assert_eq!(pool.future.len(), 3);
 
 		// let's close the cycle with one additional transaction
-		let res = pool.import(1, Transaction {
+		let res = pool.import(Transaction {
 			data: vec![4u8],
 			hash: 4,
 			priority: 50u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![],
 			provides: vec![vec![0]],
 		}).unwrap();
@@ -517,19 +518,19 @@ mod tests {
 	fn should_handle_a_cycle_with_low_priority() {
 		// given
 		let mut pool = pool();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![1u8],
 			hash: 1,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![0]],
 			provides: vec![vec![1]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![3u8],
 			hash: 3,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![1]],
 			provides: vec![vec![2]],
 		}).unwrap();
@@ -537,11 +538,11 @@ mod tests {
 		assert_eq!(pool.ready.len(), 0);
 
 		// when
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![2u8],
 			hash: 2,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![2]],
 			provides: vec![vec![0]],
 		}).unwrap();
@@ -555,11 +556,11 @@ mod tests {
 		assert_eq!(pool.future.len(), 3);
 
 		// let's close the cycle with one additional transaction
-		let err = pool.import(1, Transaction {
+		let err = pool.import(Transaction {
 			data: vec![4u8],
 			hash: 4,
 			priority: 1u64, // lower priority than Tx(2)
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![],
 			provides: vec![vec![0]],
 		}).unwrap_err();
@@ -577,52 +578,52 @@ mod tests {
 	fn should_remove_invalid_transactions() {
 		// given
 		let mut pool = pool();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![5u8],
 			hash: 5,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![],
 			provides: vec![vec![0], vec![4]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![1u8],
 			hash: 1,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![0]],
 			provides: vec![vec![1]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![3u8],
 			hash: 3,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![2]],
 			provides: vec![],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![2u8],
 			hash: 2,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![1]],
 			provides: vec![vec![3], vec![2]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![4u8],
 			hash: 4,
 			priority: 1_000u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![3], vec![4]],
 			provides: vec![],
 		}).unwrap();
 		// future
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![6u8],
 			hash: 6,
 			priority: 1_000u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![11]],
 			provides: vec![],
 		}).unwrap();
@@ -643,44 +644,44 @@ mod tests {
 		// given
 		let mut pool = pool();
 		// future (waiting for 0)
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![5u8],
 			hash: 5,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![0]],
 			provides: vec![vec![100]],
 		}).unwrap();
 		// ready
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![1u8],
 			hash: 1,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![],
 			provides: vec![vec![1]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![2u8],
 			hash: 2,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![2]],
 			provides: vec![vec![3]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![3u8],
 			hash: 3,
 			priority: 5u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![1]],
 			provides: vec![vec![2]],
 		}).unwrap();
-		pool.import(1, Transaction {
+		pool.import(Transaction {
 			data: vec![4u8],
 			hash: 4,
 			priority: 1_000u64,
-			longevity: 64u64,
+			valid_till: 64u64,
 			requires: vec![vec![3], vec![2]],
 			provides: vec![vec![4]],
 		}).unwrap();
@@ -689,7 +690,7 @@ mod tests {
 		assert_eq!(pool.future.len(), 1);
 
 		// when
-		let result = pool.prune_tags(1, vec![vec![0], vec![2]]);
+		let result = pool.prune_tags(vec![vec![0], vec![2]]);
 
 		// then
 		assert_eq!(result.pruned.len(), 2);
diff --git a/substrate/core/transaction-pool/graph/src/future.rs b/substrate/core/transaction-pool/graph/src/future.rs
index 80743c9c89b..da6e8e55f2d 100644
--- a/substrate/core/transaction-pool/graph/src/future.rs
+++ b/substrate/core/transaction-pool/graph/src/future.rs
@@ -169,6 +169,11 @@ impl<Hash: hash::Hash + Eq + Clone, Ex> FutureTransactions<Hash, Ex> {
 		removed
 	}
 
+	/// Returns iterator over all future transactions
+	pub fn all(&self) -> impl Iterator<Item=&Transaction<Hash, Ex>> {
+		self.waiting.values().map(|waiting| &waiting.transaction)
+	}
+
 	/// Returns number of transactions in the Future queue.
 	pub fn len(&self) -> usize {
 		self.waiting.len()
diff --git a/substrate/core/transaction-pool/graph/src/lib.rs b/substrate/core/transaction-pool/graph/src/lib.rs
index 1cdd78cd28b..7e8d479c08e 100644
--- a/substrate/core/transaction-pool/graph/src/lib.rs
+++ b/substrate/core/transaction-pool/graph/src/lib.rs
@@ -23,7 +23,6 @@
 //! graph in the correct order taking into account priorities and dependencies.
 //!
 //! TODO [ToDr]
-//! - [ ] Longevity handling (remove obsolete transactions periodically)
 //! - [ ] Multi-threading (getting ready transactions should not block the pool)
 // end::description[]
 
diff --git a/substrate/core/transaction-pool/graph/src/pool.rs b/substrate/core/transaction-pool/graph/src/pool.rs
index 6e6975a23f3..6e89df581c1 100644
--- a/substrate/core/transaction-pool/graph/src/pool.rs
+++ b/substrate/core/transaction-pool/graph/src/pool.rs
@@ -47,7 +47,7 @@ pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
 /// Block number type for the ChainApi
 pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
 /// A type of transaction stored in the pool
-pub type TransactionFor<A> = Arc<base::Transaction<ExHash<A>, TxData<ExtrinsicFor<A>>>>;
+pub type TransactionFor<A> = Arc<base::Transaction<ExHash<A>, ExtrinsicFor<A>>>;
 
 /// Concrete extrinsic validation and query logic.
 pub trait ChainApi: Send + Sync {
@@ -71,22 +71,6 @@ pub trait ChainApi: Send + Sync {
 	fn hash(&self, uxt: &ExtrinsicFor<Self>) -> Self::Hash;
 }
 
-/// Maximum time the transaction will be kept in the pool.
-///
-/// Transactions that don't get included within the limit are removed from the pool.
-const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5);
-
-/// Additional transaction data
-#[derive(Debug, Serialize, Deserialize)]
-pub struct TxData<Ex> {
-	/// Raw data stored by the user.
-	pub raw: Ex,
-	/// Transaction validity deadline.
-	/// TODO [ToDr] Should we use longevity instead?
-	#[serde(skip)]
-	pub valid_till: Option<time::Instant>,
-}
-
 /// Pool configuration options.
 #[derive(Debug, Clone, Default)]
 pub struct Options;
@@ -97,7 +81,7 @@ pub struct Pool<B: ChainApi> {
 	listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
 	pool: RwLock<base::BasePool<
 		ExHash<B>,
-		TxData<ExtrinsicFor<B>>,
+		ExtrinsicFor<B>,
 	>>,
 	import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
 	rotator: PoolRotator<ExHash<B>>,
@@ -121,17 +105,14 @@ impl<B: ChainApi> Pool<B> {
 				}
 
 				match self.api.validate_transaction(at, &xt)? {
-					TransactionValidity::Valid(priority, requires, provides, longevity)=> {
+					TransactionValidity::Valid(priority, requires, provides, longevity) => {
 						Ok(base::Transaction {
-							data: TxData {
-								raw: xt,
-								valid_till: Some(time::Instant::now() + POOL_TIME),
-							},
+							data:  xt,
 							hash,
 							priority,
 							requires,
 							provides,
-							longevity,
+							valid_till: block_number.as_().saturating_add(longevity),
 						})
 					},
 					TransactionValidity::Invalid => {
@@ -144,7 +125,7 @@ impl<B: ChainApi> Pool<B> {
 				}
 			})
 			.map(|tx| {
-				let imported = self.pool.write().import(block_number.as_(), tx?)?;
+				let imported = self.pool.write().import(tx?)?;
 
 				self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());
 
@@ -168,10 +149,7 @@ impl<B: ChainApi> Pool<B> {
 
 	/// Prunes ready transactions that provide given list of tags.
 	pub fn prune_tags(&self, at: &BlockId<B::Block>, tags: impl IntoIterator<Item=Tag>) -> Result<(), B::Error> {
-		let block_number = self.api.block_id_to_number(at)?
-			.ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?;
-
-		let status = self.pool.write().prune_tags(block_number.as_(), tags);
+		let status = self.pool.write().prune_tags(tags);
 		{
 			let mut listener = self.listener.write();
 			for promoted in &status.promoted {
@@ -183,7 +161,7 @@ impl<B: ChainApi> Pool<B> {
 		}
 		// try to re-submit pruned transactions since some of them might be still valid.
 		let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::<Vec<_>>();
-		let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.raw.clone()))?;
+		let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()))?;
 		// Fire mined event for transactions that became invalid.
 		let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {
 			Err(Ok(err)) => match err.kind() {
@@ -210,15 +188,29 @@ impl<B: ChainApi> Pool<B> {
 	/// Stale transactions are transaction beyond their longevity period.
 	/// Note this function does not remove transactions that are already included in the chain.
 	/// See `prune_tags` ifyou want this.
-	pub fn clear_stale(&self, _at: &BlockId<B::Block>) -> Result<(), B::Error> {
+	pub fn clear_stale(&self, at: &BlockId<B::Block>) -> Result<(), B::Error> {
+		let block_number = self.api.block_id_to_number(at)?
+				.ok_or_else(|| error::ErrorKind::Msg(format!("Invalid block id: {:?}", at)).into())?
+				.as_();
 		let now = time::Instant::now();
 		let to_remove = self.ready(|pending| pending
-			.filter(|tx| self.rotator.ban_if_stale(&now, &tx))
+			.filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx))
 			.map(|tx| tx.hash.clone())
 			.collect::<Vec<_>>()
 		);
+		let futures_to_remove: Vec<ExHash<B>> = {
+			let p = self.pool.read();
+			let mut hashes = Vec::new();
+			for tx in p.futures() {
+				if self.rotator.ban_if_stale(&now, block_number, &tx) {
+					hashes.push(tx.hash.clone());
+				}
+			}
+			hashes
+		};
 		// removing old transactions
 		self.remove_invalid(&to_remove);
+		self.remove_invalid(&futures_to_remove);
 		// clear banned transactions timeouts
 		self.rotator.clear_timeouts(&now);
 
@@ -283,7 +275,7 @@ impl<B: ChainApi> Pool<B> {
 	///
 	/// Be careful with large limit values, as querying the entire pool might be time consuming.
 	pub fn all(&self, limit: usize) -> Vec<ExtrinsicFor<B>> {
-		self.ready(|it| it.take(limit).map(|ex| ex.data.raw.clone()).collect())
+		self.ready(|it| it.take(limit).map(|ex| ex.data.clone()).collect())
 	}
 
 	/// Returns pool status.
diff --git a/substrate/core/transaction-pool/graph/src/ready.rs b/substrate/core/transaction-pool/graph/src/ready.rs
index 0da9d29930f..6df69c9a0c1 100644
--- a/substrate/core/transaction-pool/graph/src/ready.rs
+++ b/substrate/core/transaction-pool/graph/src/ready.rs
@@ -28,12 +28,11 @@ use sr_primitives::transaction_validity::{
 
 use error;
 use future::WaitingTransaction;
-use base_pool::{BlockNumber, Transaction};
+use base_pool::Transaction;
 
 #[derive(Debug)]
 pub struct TransactionRef<Hash, Ex> {
 	pub transaction: Arc<Transaction<Hash, Ex>>,
-	pub valid_till: BlockNumber,
 	pub insertion_id: u64,
 }
 
@@ -41,7 +40,6 @@ impl<Hash, Ex> Clone for TransactionRef<Hash, Ex> {
 	fn clone(&self) -> Self {
 		TransactionRef {
 			transaction: self.transaction.clone(),
-			valid_till: self.valid_till,
 			insertion_id: self.insertion_id,
 		}
 	}
@@ -50,7 +48,7 @@ impl<Hash, Ex> Clone for TransactionRef<Hash, Ex> {
 impl<Hash, Ex> Ord for TransactionRef<Hash, Ex> {
 	fn cmp(&self, other: &Self) -> cmp::Ordering {
 		self.transaction.priority.cmp(&other.transaction.priority)
-			.then(other.valid_till.cmp(&self.valid_till))
+			.then(other.transaction.valid_till.cmp(&self.transaction.valid_till))
 			.then(other.insertion_id.cmp(&self.insertion_id))
 	}
 }
@@ -143,7 +141,6 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
 	/// that are in this queue.
 	pub fn import(
 		&mut self,
-		block_number: BlockNumber,
 		tx: WaitingTransaction<Hash, Ex>,
 	) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
 		assert!(tx.is_ready(), "Only ready transactions can be imported.");
@@ -175,7 +172,6 @@ impl<Hash: hash::Hash + Member, Ex> ReadyTransactions<Hash, Ex> {
 
 		let transaction = TransactionRef {
 			insertion_id,
-			valid_till: block_number.saturating_add(tx.longevity),
 			transaction: Arc::new(tx),
 		};
 
@@ -446,7 +442,7 @@ mod tests {
 			data: vec![id],
 			hash: id as u64,
 			priority: 1,
-			longevity: 2,
+			valid_till: 2,
 			requires: vec![vec![1], vec![2]],
 			provides: vec![vec![3], vec![4]],
 		}
@@ -456,7 +452,6 @@ mod tests {
 	fn should_replace_transaction_that_provides_the_same_tag() {
 		// given
 		let mut ready = ReadyTransactions::default();
-		let block_number = 1;
 		let mut tx1 = tx(1);
 		tx1.requires.clear();
 		let mut tx2 = tx(2);
@@ -468,18 +463,18 @@ mod tests {
 
 		// when
 		let x = WaitingTransaction::new(tx2, &ready.provided_tags());
-		ready.import(block_number, x).unwrap();
+		ready.import(x).unwrap();
 		let x = WaitingTransaction::new(tx3, &ready.provided_tags());
-		ready.import(block_number, x).unwrap();
+		ready.import(x).unwrap();
 		assert_eq!(ready.get().count(), 2);
 
 		// too low priority
 		let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags());
-		ready.import(block_number, x).unwrap_err();
+		ready.import(x).unwrap_err();
 
 		tx1.priority = 10;
 		let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags());
-		ready.import(block_number, x).unwrap();
+		ready.import(x).unwrap();
 
 		// then
 		assert_eq!(ready.get().count(), 1);
@@ -501,27 +496,26 @@ mod tests {
 		let mut tx4 = tx(4);
 		tx4.requires = vec![tx1.provides[0].clone()];
 		tx4.provides = vec![];
-		let block_number = 1;
 		let tx5 = Transaction {
 			data: vec![5],
 			hash: 5,
 			priority: 1,
-			longevity: u64::max_value(),	// use the max_value() here for testing.
+			valid_till: u64::max_value(),	// use the max_value() here for testing.
 			requires: vec![tx1.provides[0].clone()],
 			provides: vec![],
 		};
 
 		// when
 		let x = WaitingTransaction::new(tx1, &ready.provided_tags());
-		ready.import(block_number, x).unwrap();
+		ready.import(x).unwrap();
 		let x = WaitingTransaction::new(tx2, &ready.provided_tags());
-		ready.import(block_number, x).unwrap();
+		ready.import(x).unwrap();
 		let x = WaitingTransaction::new(tx3, &ready.provided_tags());
-		ready.import(block_number, x).unwrap();
+		ready.import(x).unwrap();
 		let x = WaitingTransaction::new(tx4, &ready.provided_tags());
-		ready.import(block_number, x).unwrap();
+		ready.import(x).unwrap();
 		let x = WaitingTransaction::new(tx5, &ready.provided_tags());
-		ready.import(block_number, x).unwrap();
+		ready.import(x).unwrap();
 
 		// then
 		assert_eq!(ready.best.len(), 1);
@@ -539,40 +533,35 @@ mod tests {
 	#[test]
 	fn should_order_refs() {
 		let mut id = 1;
-		let mut with_priority = |priority| {
+		let mut with_priority = |priority, longevity| {
 			id += 1;
 			let mut tx = tx(id);
 			tx.priority = priority;
+			tx.valid_till = longevity;
 			tx
 		};
 		// higher priority = better
 		assert!(TransactionRef {
-			transaction: Arc::new(with_priority(3)),
-			valid_till: 3,
+			transaction: Arc::new(with_priority(3, 3)),
 			insertion_id: 1,
 		} > TransactionRef {
-			transaction: Arc::new(with_priority(2)),
-			valid_till: 3,
+			transaction: Arc::new(with_priority(2, 3)),
 			insertion_id: 2,
 		});
 		// lower validity = better
 		assert!(TransactionRef {
-			transaction: Arc::new(with_priority(3)),
-			valid_till: 2,
+			transaction: Arc::new(with_priority(3, 2)),
 			insertion_id: 1,
 		} > TransactionRef {
-			transaction: Arc::new(with_priority(3)),
-			valid_till: 3,
+			transaction: Arc::new(with_priority(3, 3)),
 			insertion_id: 2,
 		});
 		// lower insertion_id = better
 		assert!(TransactionRef {
-			transaction: Arc::new(with_priority(3)),
-			valid_till: 3,
+			transaction: Arc::new(with_priority(3, 3)),
 			insertion_id: 1,
 		} > TransactionRef {
-			transaction: Arc::new(with_priority(3)),
-			valid_till: 3,
+			transaction: Arc::new(with_priority(3, 3)),
 			insertion_id: 2,
 		});
 	}
diff --git a/substrate/core/transaction-pool/graph/src/rotator.rs b/substrate/core/transaction-pool/graph/src/rotator.rs
index b8fdd6863b1..e5ba7ccde65 100644
--- a/substrate/core/transaction-pool/graph/src/rotator.rs
+++ b/substrate/core/transaction-pool/graph/src/rotator.rs
@@ -27,7 +27,6 @@ use std::{
 use parking_lot::RwLock;
 
 use base_pool::Transaction;
-use pool::TxData;
 
 /// Expected size of the banned extrinsics cache.
 const EXPECTED_SIZE: usize = 2048;
@@ -79,12 +78,9 @@ impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
 	/// Bans extrinsic if it's stale.
 	///
 	/// Returns `true` if extrinsic is stale and got banned.
-	pub fn ban_if_stale<Ex>(&self, now: &Instant, xt: &Transaction<Hash, TxData<Ex>>) -> bool {
-		match xt.data.valid_till {
-			Some(ref valid_till) if valid_till > now => {
-				return false;
-			}
-			_ => {},
+	pub fn ban_if_stale<Ex>(&self, now: &Instant, current_block: u64, xt: &Transaction<Hash, Ex>) -> bool {
+		if xt.valid_till > current_block {
+			return false;
 		}
 
 		self.ban(now, &[xt.hash.clone()]);
@@ -113,16 +109,13 @@ mod tests {
 		}
 	}
 
-	fn tx() -> (Hash, Transaction<Hash, TxData<Ex>>) {
+	fn tx() -> (Hash, Transaction<Hash, Ex>) {
 		let hash = 5u64;
 		let tx = Transaction {
-			data: TxData {
-				raw: (),
-				valid_till: Some(Instant::now()),
-			},
+			data: (),
 			hash: hash.clone(),
 			priority: 5,
-			longevity: 3,
+			valid_till: 1,
 			requires: vec![],
 			provides: vec![],
 		};
@@ -136,10 +129,11 @@ mod tests {
 		let (hash, tx) = tx();
 		let rotator = rotator();
 		assert!(!rotator.is_banned(&hash));
-		let past = Instant::now() - Duration::from_millis(1000);
+		let now = Instant::now();
+		let past_block = 0;
 
 		// when
-		assert!(!rotator.ban_if_stale(&past, &tx));
+		assert!(!rotator.ban_if_stale(&now, past_block, &tx));
 
 		// then
 		assert!(!rotator.is_banned(&hash));
@@ -153,7 +147,7 @@ mod tests {
 		assert!(!rotator.is_banned(&hash));
 
 		// when
-		assert!(rotator.ban_if_stale(&Instant::now(), &tx));
+		assert!(rotator.ban_if_stale(&Instant::now(), 1, &tx));
 
 		// then
 		assert!(rotator.is_banned(&hash));
@@ -165,7 +159,7 @@ mod tests {
 		// given
 		let (hash, tx) = tx();
 		let rotator = rotator();
-		assert!(rotator.ban_if_stale(&Instant::now(), &tx));
+		assert!(rotator.ban_if_stale(&Instant::now(), 1, &tx));
 		assert!(rotator.is_banned(&hash));
 
 		// when
@@ -179,16 +173,13 @@ mod tests {
 	#[test]
 	fn should_garbage_collect() {
 		// given
-		fn tx_with(i: u64, time: Instant) -> Transaction<Hash, TxData<Ex>> {
+		fn tx_with(i: u64, valid_till: u64) -> Transaction<Hash, Ex> {
 			let hash = i;
 			Transaction {
-				data: TxData {
-					raw: (),
-					valid_till: Some(time),
-				},
+				data: (),
 				hash,
 				priority: 5,
-				longevity: 3,
+				valid_till,
 				requires: vec![],
 				provides: vec![],
 			}
@@ -197,19 +188,19 @@ mod tests {
 		let rotator = rotator();
 
 		let now = Instant::now();
-		let past = now - Duration::from_secs(1);
+		let past_block = 0;
 
 		// when
 		for i in 0..2*EXPECTED_SIZE {
-			let tx = tx_with(i as u64, past);
-			assert!(rotator.ban_if_stale(&now, &tx));
+			let tx = tx_with(i as u64, past_block);
+			assert!(rotator.ban_if_stale(&now, past_block, &tx));
 		}
 		assert_eq!(rotator.banned_until.read().len(), 2*EXPECTED_SIZE);
 
 		// then
-		let tx = tx_with(2*EXPECTED_SIZE as u64, past);
+		let tx = tx_with(2*EXPECTED_SIZE as u64, past_block);
 		// trigger a garbage collection
-		assert!(rotator.ban_if_stale(&now, &tx));
+		assert!(rotator.ban_if_stale(&now, past_block, &tx));
 		assert_eq!(rotator.banned_until.read().len(), EXPECTED_SIZE);
 	}
 }
diff --git a/substrate/core/transaction-pool/src/tests.rs b/substrate/core/transaction-pool/src/tests.rs
index e1ff23af9b3..f99ff8b9034 100644
--- a/substrate/core/transaction-pool/src/tests.rs
+++ b/substrate/core/transaction-pool/src/tests.rs
@@ -109,7 +109,7 @@ fn submission_should_work() {
 	assert_eq!(209, index(&BlockId::number(0)));
 	pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
 
-	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect());
+	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
 	assert_eq!(pending, vec![209]);
 }
 
@@ -119,7 +119,7 @@ fn multiple_submission_should_work() {
 	pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
 	pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
 
-	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect());
+	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
 	assert_eq!(pending, vec![209, 210]);
 }
 
@@ -128,7 +128,7 @@ fn early_nonce_should_be_culled() {
 	let pool = pool();
 	pool.submit_one(&BlockId::number(0), uxt(Alice, 208)).unwrap();
 
-	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect());
+	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
 	assert_eq!(pending, Vec::<Index>::new());
 }
 
@@ -137,11 +137,11 @@ fn late_nonce_should_be_queued() {
 	let pool = pool();
 
 	pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
-	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect());
+	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
 	assert_eq!(pending, Vec::<Index>::new());
 
 	pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
-	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect());
+	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
 	assert_eq!(pending, vec![209, 210]);
 }
 
@@ -151,12 +151,12 @@ fn prune_tags_should_work() {
 	pool.submit_one(&BlockId::number(0), uxt(Alice, 209)).unwrap();
 	pool.submit_one(&BlockId::number(0), uxt(Alice, 210)).unwrap();
 
-	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect());
+	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
 	assert_eq!(pending, vec![209, 210]);
 
 	pool.prune_tags(&BlockId::number(1), vec![vec![209]]).unwrap();
 
-	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect());
+	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
 	assert_eq!(pending, vec![210]);
 }
 
@@ -169,7 +169,7 @@ fn should_ban_invalid_transactions() {
 	pool.submit_one(&BlockId::number(0), uxt.clone()).unwrap_err();
 
 	// when
-	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.raw.transfer.nonce).collect());
+	let pending: Vec<_> = pool.ready(|p| p.map(|a| a.data.transfer.nonce).collect());
 	assert_eq!(pending, Vec::<Index>::new());
 
 	// then
diff --git a/substrate/node/consensus/src/lib.rs b/substrate/node/consensus/src/lib.rs
index b8bb32e3a53..8e63b009b9a 100644
--- a/substrate/node/consensus/src/lib.rs
+++ b/substrate/node/consensus/src/lib.rs
@@ -314,10 +314,10 @@ impl<C, A> bft::Proposer<<C as AuthoringApi>::Block> for Proposer<C, A> where
 					let mut pending_size = 0;
 					for pending in pending_iterator {
 						// TODO [ToDr] Probably get rid of it, and validate in runtime.
-						let encoded_size = pending.data.raw.encode().len();
+						let encoded_size = pending.data.encode().len();
 						if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE { break }
 
-						match block_builder.push_extrinsic(pending.data.raw.clone()) {
+						match block_builder.push_extrinsic(pending.data.clone()) {
 							Ok(()) => {
 								pending_size += encoded_size;
 							}
-- 
GitLab