diff --git a/substrate/substrate/extrinsic-pool/src/lib.rs b/substrate/substrate/extrinsic-pool/src/lib.rs index 7aee7130e3d6de4fcba4d9ab9094f067cb908447..f45a5697c6e77ab188a6dea68714a49ffbe6ed88 100644 --- a/substrate/substrate/extrinsic-pool/src/lib.rs +++ b/substrate/substrate/extrinsic-pool/src/lib.rs @@ -37,6 +37,7 @@ pub mod watcher; mod error; mod listener; mod pool; +mod rotator; pub use listener::Listener; pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics}; diff --git a/substrate/substrate/extrinsic-pool/src/pool.rs b/substrate/substrate/extrinsic-pool/src/pool.rs index 5f70123eb937846626cac90f625bfbe7451b5745..fb597564798834eeb7dcc1d5437433204d1de25d 100644 --- a/substrate/substrate/extrinsic-pool/src/pool.rs +++ b/substrate/substrate/extrinsic-pool/src/pool.rs @@ -14,15 +14,21 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. -use std::{ collections::HashMap, fmt, sync::Arc, collections::BTreeMap}; +use std::{ + collections::{BTreeMap, HashMap}, + fmt, + sync::Arc, + time, +}; use futures::sync::mpsc; use parking_lot::{Mutex, RwLock}; use serde::{Serialize, de::DeserializeOwned}; use txpool::{self, Scoring, Readiness}; +use error::IntoPoolError; use listener::Listener; +use rotator::PoolRotator; use watcher::Watcher; -use error::IntoPoolError; use runtime_primitives::{generic::BlockId, traits::Block as BlockT}; @@ -40,16 +46,18 @@ pub type AllExtrinsics<A> = BTreeMap<<<A as ChainApi>::VEx as txpool::VerifiedTr /// Verified extrinsic struct. Wraps original extrinsic and verification info. #[derive(Debug)] -pub struct Verified<Ex: ::std::fmt::Debug, VEx: txpool::VerifiedTransaction> { +pub struct Verified<Ex, VEx> { /// Original extrinsic. pub original: Ex, /// Verification data. pub verified: VEx, + /// Pool deadline, after it's reached we remove the extrinsic from the pool. + pub valid_till: time::Instant, } -impl<Ex, VEx> txpool::VerifiedTransaction for Verified<Ex, VEx> -where - Ex: ::std::fmt::Debug, +impl<Ex, VEx> txpool::VerifiedTransaction for Verified<Ex, VEx> +where + Ex: fmt::Debug, VEx: txpool::VerifiedTransaction, { type Hash = <VEx as txpool::VerifiedTransaction>::Hash; @@ -118,10 +126,17 @@ pub struct Ready<'a, 'b, B: 'a + ChainApi> { api: &'a B, at: &'b BlockId<B::Block>, context: B::Ready, + rotator: &'a PoolRotator<B::Hash>, + now: time::Instant, } impl<'a, 'b, B: ChainApi> txpool::Ready<VerifiedFor<B>> for Ready<'a, 'b, B> { fn is_ready(&mut self, xt: &VerifiedFor<B>) -> Readiness { + if self.rotator.ban_if_stale(&self.now, xt) { + debug!(target: "extrinsic-pool", "[{:?}] Banning as stale.", txpool::VerifiedTransaction::hash(xt)); + return Readiness::Stale; + } + self.api.is_ready(self.at, &mut self.context, xt) } } @@ -155,6 +170,11 @@ impl<T: ChainApi> Scoring<VerifiedFor<T>> for ScoringAdapter<T> { } } +/// Maximum time the transaction will be kept in the pool. +/// +/// Transactions that don't get included within the limit are removed from the pool. +const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5); + /// Extrinsics pool. pub struct Pool<B: ChainApi> { api: B, @@ -164,6 +184,7 @@ pub struct Pool<B: ChainApi> { Listener<B::Hash>, >>, import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>, + rotator: PoolRotator<B::Hash>, } impl<B: ChainApi> Pool<B> { @@ -173,6 +194,7 @@ impl<B: ChainApi> Pool<B> { pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::<B>(Default::default()), options)), import_notification_sinks: Default::default(), api, + rotator: Default::default(), } } @@ -206,9 +228,20 @@ impl<B: ChainApi> Pool<B> { { xts .into_iter() - .map(|xt| (self.api.verify_transaction(at, &xt), xt)) + .map(|xt| { + match self.api.verify_transaction(at, &xt) { + Ok(ref verified) if self.rotator.is_banned(txpool::VerifiedTransaction::hash(verified)) => { + return (Err(txpool::Error::from("Temporarily Banned".to_owned()).into()), xt) + }, + result => (result, xt), + } + }) .map(|(v, xt)| { - let xt = Verified { original: xt, verified: v? }; + let xt = Verified { + original: xt, + verified: v?, + valid_till: time::Instant::now() + POOL_TIME, + }; Ok(self.pool.write().import(xt)?) }) .collect() @@ -216,9 +249,7 @@ impl<B: ChainApi> Pool<B> { /// Imports one unverified extrinsic to the pool pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Arc<VerifiedFor<B>>, B::Error> { - let v = self.api.verify_transaction(at, &xt)?; - let xt = Verified { original: xt, verified: v }; - Ok(self.pool.write().import(xt)?) + Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")) } /// Import a single extrinsic and starts to watch their progress in the pool. @@ -244,7 +275,8 @@ impl<B: ChainApi> Pool<B> { senders: Option<&[<B::VEx as txpool::VerifiedTransaction>::Sender]>, ) -> usize { - let ready = Ready { api: &self.api, context: self.api.ready(), at }; + self.rotator.clear_timeouts(&time::Instant::now()); + let ready = self.ready(at); self.pool.write().cull(senders, ready) } @@ -284,9 +316,9 @@ impl<B: ChainApi> Pool<B> { pub fn pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> T where F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T, { - let ready = Ready { api: &self.api, context: self.api.ready(), at }; + let ready = self.ready(at); f(self.pool.read().pending(ready)) - } + } /// Retry to import all verified transactions from given sender. pub fn retry_verification(&self, at: &BlockId<B::Block>, sender: <B::VEx as txpool::VerifiedTransaction>::Sender) -> Result<(), B::Error> { @@ -326,6 +358,16 @@ impl<B: ChainApi> Pool<B> { map }) } + + fn ready<'a, 'b>(&'a self, at: &'b BlockId<B::Block>) -> Ready<'a, 'b, B> { + Ready { + api: &self.api, + rotator: &self.rotator, + context: self.api.ready(), + at, + now: time::Instant::now(), + } + } } /// A Readiness implementation that returns `Ready` for all transactions. @@ -337,7 +379,7 @@ impl<VEx> txpool::Ready<VEx> for AlwaysReady { } #[cfg(test)] -mod tests { +pub mod tests { use txpool; use super::{VerifiedFor, ExtrinsicFor}; use std::collections::HashMap; @@ -353,9 +395,9 @@ mod tests { #[derive(Clone, Debug)] pub struct VerifiedTransaction { - hash: Hash, - sender: AccountId, - nonce: u64, + pub hash: Hash, + pub sender: AccountId, + pub nonce: u64, } impl txpool::VerifiedTransaction for VerifiedTransaction { @@ -419,7 +461,7 @@ mod tests { result } - + fn ready(&self) -> Self::Ready { HashMap::default() } diff --git a/substrate/substrate/extrinsic-pool/src/rotator.rs b/substrate/substrate/extrinsic-pool/src/rotator.rs new file mode 100644 index 0000000000000000000000000000000000000000..1f4b1c4e737e107d3277204ddfd45b34cb45883c --- /dev/null +++ b/substrate/substrate/extrinsic-pool/src/rotator.rs @@ -0,0 +1,177 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! Rotate extrinsic inside the pool. +//! +//! Keeps only recent extrinsic and discard the ones kept for a significant amount of time. +//! Discarded extrinsics are banned so that they don't get re-imported again. + +use std::{ + collections::HashMap, + fmt, + hash, + time::{Duration, Instant}, +}; +use parking_lot::RwLock; +use txpool::VerifiedTransaction; +use Verified; + +/// Expected size of the banned extrinsics cache. +const EXPECTED_SIZE: usize = 2048; + +/// Pool rotator is responsible to only keep fresh extrinsics in the pool. +/// +/// Extrinsics that occupy the pool for too long are culled and temporarily banned from entering +/// the pool again. +pub struct PoolRotator<Hash> { + /// How long the extrinsic is banned for. + ban_time: Duration, + /// Currently banned extrinsics. + banned_until: RwLock<HashMap<Hash, Instant>>, +} + +impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> { + fn default() -> Self { + PoolRotator { + ban_time: Duration::from_secs(60 * 30), + banned_until: Default::default(), + } + } +} + +impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> { + /// Returns `true` if extrinsic hash is currently banned. + pub fn is_banned(&self, hash: &Hash) -> bool { + self.banned_until.read().contains_key(hash) + } + + /// Bans extrinsic if it's stale. + /// + /// Returns `true` if extrinsic is stale and got banned. + pub fn ban_if_stale<Ex, VEx>(&self, now: &Instant, xt: &Verified<Ex, VEx>) -> bool where + VEx: VerifiedTransaction<Hash=Hash>, + Hash: fmt::Debug + fmt::LowerHex, + { + if &xt.valid_till > now { + return false; + } + + let mut banned = self.banned_until.write(); + banned.insert(xt.verified.hash().clone(), *now + self.ban_time); + + if banned.len() > 2 * EXPECTED_SIZE { + while banned.len() > EXPECTED_SIZE { + if let Some(key) = banned.keys().next().cloned() { + banned.remove(&key); + } + } + } + + true + } + + /// Removes timed bans. + pub fn clear_timeouts(&self, now: &Instant) { + let mut banned = self.banned_until.write(); + + let to_remove = banned + .iter() + .filter_map(|(k, v)| if v < now { + Some(k.clone()) + } else { + None + }).collect::<Vec<_>>(); + + for k in to_remove { + banned.remove(&k); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pool::tests::VerifiedTransaction; + use test_client::runtime::Hash; + + fn rotator() -> PoolRotator<Hash> { + PoolRotator { + ban_time: Duration::from_millis(10), + ..Default::default() + } + } + + fn tx() -> (Hash, Verified<u64, VerifiedTransaction>) { + let hash = 5.into(); + let tx = Verified { + original: 5, + verified: VerifiedTransaction { + hash, + sender: Default::default(), + nonce: Default::default(), + }, + valid_till: Instant::now(), + }; + + (hash, tx) + } + + #[test] + fn should_not_ban_if_not_stale() { + // given + let (hash, tx) = tx(); + let rotator = rotator(); + assert!(!rotator.is_banned(&hash)); + let past = Instant::now() - Duration::from_millis(1000); + + // when + assert!(!rotator.ban_if_stale(&past, &tx)); + + // then + assert!(!rotator.is_banned(&hash)); + } + + #[test] + fn should_ban_stale_extrinsic() { + // given + let (hash, tx) = tx(); + let rotator = rotator(); + assert!(!rotator.is_banned(&hash)); + + // when + assert!(rotator.ban_if_stale(&Instant::now(), &tx)); + + // then + assert!(rotator.is_banned(&hash)); + } + + + #[test] + fn should_clear_banned() { + // given + let (hash, tx) = tx(); + let rotator = rotator(); + assert!(rotator.ban_if_stale(&Instant::now(), &tx)); + assert!(rotator.is_banned(&hash)); + + // when + let future = Instant::now() + rotator.ban_time + rotator.ban_time; + rotator.clear_timeouts(&future); + + // then + assert!(!rotator.is_banned(&hash)); + } +}