diff --git a/substrate/core/rpc-servers/src/lib.rs b/substrate/core/rpc-servers/src/lib.rs index fb989740d1a2f75ad370c492db19afdc91d153cb..d4a0ddea477193163c42da266a879e45c897f4cd 100644 --- a/substrate/core/rpc-servers/src/lib.rs +++ b/substrate/core/rpc-servers/src/lib.rs @@ -51,7 +51,7 @@ pub fn rpc_handler<Block: BlockT, ExHash, PendingExtrinsics, S, C, A, Y>( PendingExtrinsics: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static, S: apis::state::StateApi<Block::Hash, Metadata=Metadata>, C: apis::chain::ChainApi<Block::Hash, Block::Header, NumberFor<Block>, Block::Extrinsic, Metadata=Metadata>, - A: apis::author::AuthorApi<ExHash, Block::Extrinsic, PendingExtrinsics, Metadata=Metadata>, + A: apis::author::AuthorApi<ExHash, Block::Hash, Block::Extrinsic, PendingExtrinsics, Metadata=Metadata>, Y: apis::system::SystemApi, { let mut io = pubsub::PubSubHandler::default(); diff --git a/substrate/core/rpc/src/author/mod.rs b/substrate/core/rpc/src/author/mod.rs index ec1c3950ac2df7245b1f15fa878759532e9a121f..1625816e8fd0f796fd8421666002e0bf2bb9ce2b 100644 --- a/substrate/core/rpc/src/author/mod.rs +++ b/substrate/core/rpc/src/author/mod.rs @@ -29,6 +29,7 @@ use transaction_pool::{ AllExtrinsics, ExHash, ExtrinsicFor, + HashOf, }; use jsonrpc_macros::pubsub; use jsonrpc_pubsub::SubscriptionId; @@ -46,7 +47,7 @@ use self::error::Result; build_rpc_trait! { /// Substrate authoring RPC API - pub trait AuthorApi<Hash, Extrinsic, PendingExtrinsics> { + pub trait AuthorApi<Hash, BlockHash, Extrinsic, PendingExtrinsics> { type Metadata; /// Submit extrinsic for inclusion in block. @@ -63,7 +64,7 @@ build_rpc_trait! { #[pubsub(name = "author_extrinsicUpdate")] { /// Submit an extrinsic to watch. #[rpc(name = "author_submitAndWatchExtrinsic")] - fn watch_extrinsic(&self, Self::Metadata, pubsub::Subscriber<Status<Hash>>, Bytes); + fn watch_extrinsic(&self, Self::Metadata, pubsub::Subscriber<Status<Hash, BlockHash>>, Bytes); /// Unsubscribe from extrinsic watching. #[rpc(name = "author_unwatchExtrinsic")] @@ -102,7 +103,7 @@ impl<B, E, P> Author<B, E, P> where } } -impl<B, E, P> AuthorApi<ExHash<P>, ExtrinsicFor<P>, AllExtrinsics<P>> for Author<B, E, P> where +impl<B, E, P> AuthorApi<ExHash<P>, HashOf<P::Block>, ExtrinsicFor<P>, AllExtrinsics<P>> for Author<B, E, P> where B: client::backend::Backend<<P as PoolChainApi>::Block, Blake2Hasher> + Send + Sync + 'static, E: client::CallExecutor<<P as PoolChainApi>::Block, Blake2Hasher> + Send + Sync + 'static, P: PoolChainApi + Sync + Send + 'static, @@ -130,7 +131,7 @@ impl<B, E, P> AuthorApi<ExHash<P>, ExtrinsicFor<P>, AllExtrinsics<P>> for Author Ok(self.pool.all()) } - fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber<Status<ExHash<P>>>, xt: Bytes) { + fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber<Status<ExHash<P>, HashOf<P::Block>>>, xt: Bytes) { let submit = || -> Result<_> { let best_block_hash = self.client.info()?.chain.best_hash; let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?; diff --git a/substrate/core/rpc/src/author/tests.rs b/substrate/core/rpc/src/author/tests.rs index 4ea3ead4d1cd616eec246bcb0568dff386c2b54e..9c2483ae5235c123a20cc54f511b7d7f4574fcae 100644 --- a/substrate/core/rpc/src/author/tests.rs +++ b/substrate/core/rpc/src/author/tests.rs @@ -23,7 +23,7 @@ use transaction_pool::{VerifiedTransaction, scoring, Transaction, ChainApi, Erro use test_client::runtime::{Block, Extrinsic, Transfer}; use test_client; use tokio::runtime; -use runtime_primitives::generic::BlockId; +use runtime_primitives::{traits, generic::BlockId}; #[derive(Clone, Debug)] pub struct Verified @@ -53,6 +53,10 @@ impl ChainApi for TestApi { type Event = (); type Ready = (); + fn latest_hash(&self) -> <Block as traits::Block>::Hash { + 1.into() + } + fn verify_transaction(&self, _at: &BlockId<Block>, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error> { Ok(Verified { sender: uxt.transfer.from[31] as u64, diff --git a/substrate/core/transaction-pool/src/lib.rs b/substrate/core/transaction-pool/src/lib.rs index f7c6c4bfd2f26c936274f6fb110fcf3b581f8593..1e79f5efd21dd69af005e5d8d77a35ff90ce1223 100644 --- a/substrate/core/transaction-pool/src/lib.rs +++ b/substrate/core/transaction-pool/src/lib.rs @@ -42,7 +42,7 @@ mod pool; mod rotator; pub use listener::Listener; -pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics}; +pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics, HashOf}; pub use txpool::scoring; pub use txpool::{Error, ErrorKind}; pub use error::IntoPoolError; diff --git a/substrate/core/transaction-pool/src/listener.rs b/substrate/core/transaction-pool/src/listener.rs index 42ad6ef1c36a7731a8c67b2283ca1ffb531c819c..c86337db6a02d329446b47a1d009229777f69425 100644 --- a/substrate/core/transaction-pool/src/listener.rs +++ b/substrate/core/transaction-pool/src/listener.rs @@ -20,20 +20,38 @@ use std::{ collections::HashMap, }; use txpool; - use watcher; +/// Returns the hash of the latest block. +pub trait LatestHash { + type Hash: Clone; + + /// Hash of the latest block. + fn latest_hash(&self) -> Self::Hash; +} + /// Extrinsic pool default listener. -#[derive(Default)] -pub struct Listener<H: ::std::hash::Hash + Eq> { - watchers: HashMap<H, watcher::Sender<H>> +pub struct Listener<H: ::std::hash::Hash + Eq, C: LatestHash> { + watchers: HashMap<H, watcher::Sender<H, C::Hash>>, + chain: C, } -impl<H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default> Listener<H> { +impl<H, C> Listener<H, C> where + H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default, + C: LatestHash, +{ + /// Creates a new listener with given latest hash provider. + pub fn new(chain: C) -> Self { + Listener { + watchers: Default::default(), + chain, + } + } + /// Creates a new watcher for given verified extrinsic. /// /// The watcher can be used to subscribe to lifecycle events of that extrinsic. - pub fn create_watcher<T: txpool::VerifiedTransaction<Hash=H>>(&mut self, xt: Arc<T>) -> watcher::Watcher<H> { + pub fn create_watcher<T: txpool::VerifiedTransaction<Hash=H>>(&mut self, xt: Arc<T>) -> watcher::Watcher<H, C::Hash> { let sender = self.watchers.entry(*xt.hash()).or_insert_with(watcher::Sender::default); sender.new_watcher() } @@ -43,7 +61,7 @@ impl<H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default> Li self.fire(hash, |watcher| watcher.broadcast(peers)); } - fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H>) { + fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, C::Hash>) { let clean = if let Some(h) = self.watchers.get_mut(hash) { fun(h); h.is_done() @@ -57,9 +75,10 @@ impl<H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default> Li } } -impl<H, T> txpool::Listener<T> for Listener<H> where +impl<H, T, C> txpool::Listener<T> for Listener<H, C> where H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default, T: txpool::VerifiedTransaction<Hash=H>, + C: LatestHash, { fn added(&mut self, tx: &Arc<T>, old: Option<&Arc<T>>) { if let Some(old) = old { @@ -88,8 +107,7 @@ impl<H, T> txpool::Listener<T> for Listener<H> where } fn culled(&mut self, tx: &Arc<T>) { - // TODO [ToDr] latest block number? - let header_hash = Default::default(); + let header_hash = self.chain.latest_hash(); self.fire(tx.hash(), |watcher| watcher.finalised(header_hash)) } } diff --git a/substrate/core/transaction-pool/src/pool.rs b/substrate/core/transaction-pool/src/pool.rs index f812a5e54451483eeeb5fb6497c48708a39825a7..d6c8504085cef7cfafc7dc99f7e2f85936bd2ba4 100644 --- a/substrate/core/transaction-pool/src/pool.rs +++ b/substrate/core/transaction-pool/src/pool.rs @@ -26,11 +26,11 @@ use serde::{Serialize, de::DeserializeOwned}; use txpool::{self, Scoring, Readiness}; use error::IntoPoolError; -use listener::Listener; +use listener::{self, Listener}; use rotator::PoolRotator; use watcher::Watcher; -use runtime_primitives::{generic::BlockId, traits::Block as BlockT}; +use runtime_primitives::{generic::BlockId, traits}; /// Modification notification event stream type; pub type EventStream = mpsc::UnboundedReceiver<()>; @@ -38,7 +38,7 @@ pub type EventStream = mpsc::UnboundedReceiver<()>; /// Extrinsic hash type for a pool. pub type ExHash<A> = <A as ChainApi>::Hash; /// Extrinsic type for a pool. -pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as BlockT>::Extrinsic; +pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic; /// Verified extrinsic data for `ChainApi`. pub type VerifiedFor<A> = Verified<ExtrinsicFor<A>, <A as ChainApi>::VEx>; /// A collection of all extrinsics. @@ -80,7 +80,7 @@ where /// Concrete extrinsic validation and query logic. pub trait ChainApi: Send + Sync { /// Block type. - type Block: BlockT; + type Block: traits::Block; /// Extrinsic hash type. type Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Serialize + DeserializeOwned + ::std::str::FromStr + Send + Sync + Default + 'static; /// Extrinsic sender type. @@ -96,6 +96,7 @@ pub trait ChainApi: Send + Sync { type Score: ::std::cmp::Ord + Clone + Default + fmt::Debug + Send + Send + Sync + fmt::LowerHex; /// Custom scoring update event type. type Event: ::std::fmt::Debug; + /// Verify extrinsic at given block. fn verify_transaction(&self, at: &BlockId<Self::Block>, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error>; @@ -120,6 +121,20 @@ pub trait ChainApi: Send + Sync { /// /// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits. fn should_replace(old: &VerifiedFor<Self>, new: &VerifiedFor<Self>) -> txpool::scoring::Choice; + + /// Returns hash of the latest block in chain. + fn latest_hash(&self) -> HashOf<Self::Block>; +} + +/// Returns block's hash type. +pub type HashOf<B> = <B as traits::Block>::Hash; + +impl<T: ChainApi> listener::LatestHash for Arc<T> { + type Hash = HashOf<T::Block>; + + fn latest_hash(&self) -> HashOf<T::Block> { + ChainApi::latest_hash(&**self) + } } pub struct Ready<'a, 'b, B: 'a + ChainApi> { @@ -177,11 +192,11 @@ const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5); /// Extrinsics pool. pub struct Pool<B: ChainApi> { - api: B, + api: Arc<B>, pool: RwLock<txpool::Pool< VerifiedFor<B>, ScoringAdapter<B>, - Listener<B::Hash>, + Listener<B::Hash, Arc<B>>, >>, import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>, rotator: PoolRotator<B::Hash>, @@ -190,8 +205,9 @@ pub struct Pool<B: ChainApi> { impl<B: ChainApi> Pool<B> { /// Create a new transaction pool. pub fn new(options: txpool::Options, api: B) -> Self { + let api = Arc::new(api); Pool { - pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::<B>(Default::default()), options)), + pool: RwLock::new(txpool::Pool::new(Listener::new(api.clone()), ScoringAdapter::<B>(Default::default()), options)), import_notification_sinks: Default::default(), api, rotator: Default::default(), @@ -253,7 +269,7 @@ impl<B: ChainApi> Pool<B> { } /// Import a single extrinsic and starts to watch their progress in the pool. - pub fn submit_and_watch(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Watcher<B::Hash>, B::Error> { + pub fn submit_and_watch(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Watcher<B::Hash, HashOf<B::Block>>, B::Error> { let xt = self.submit_at(at, Some(xt))?.pop().expect("One extrinsic passed; one result returned; qed"); Ok(self.pool.write().listener_mut().create_watcher(xt)) } @@ -295,7 +311,7 @@ impl<B: ChainApi> Pool<B> { /// Cull transactions from the queue and then compute the pending set. pub fn cull_and_get_pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> Result<T, B::Error> where - F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T, + F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash, Arc<B>>>) -> T, { self.cull_from(at, None); Ok(self.pending(at, f)) @@ -322,7 +338,7 @@ impl<B: ChainApi> Pool<B> { /// Retrieve the pending set. Be careful to not leak the pool `ReadGuard` to prevent deadlocks. pub fn pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> T where - F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T, + F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash, Arc<B>>>) -> T, { let ready = self.ready(at); f(self.pool.read().pending(ready)) @@ -389,7 +405,7 @@ impl<VEx> txpool::Ready<VEx> for AlwaysReady { #[cfg(test)] pub mod tests { use txpool; - use super::{VerifiedFor, ExtrinsicFor}; + use super::{VerifiedFor, ExtrinsicFor, HashOf}; use std::collections::HashMap; use std::cmp::Ordering; use {Pool, ChainApi, scoring, Readiness}; @@ -443,6 +459,10 @@ pub mod tests { type Score = u64; type Event = (); + fn latest_hash(&self) -> HashOf<Self::Block> { + 1.into() + } + fn verify_transaction(&self, _at: &BlockId, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error> { let hash = BlakeTwo256::hash(&uxt.encode()); let xt = uxt.clone().check()?; diff --git a/substrate/core/transaction-pool/src/watcher.rs b/substrate/core/transaction-pool/src/watcher.rs index 78b27326f861fd22e4707fde775b697e4b8bdb35..fd6758fdba80cfe00da8f8cea47dd5d37b03b6f7 100644 --- a/substrate/core/transaction-pool/src/watcher.rs +++ b/substrate/core/transaction-pool/src/watcher.rs @@ -24,9 +24,9 @@ use futures::{ /// Possible extrinsic status events #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub enum Status<H> { +pub enum Status<H, H2> { /// Extrinsic has been finalised in block with given hash. - Finalised(H), + Finalised(H2), /// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid. Usurped(H), /// The extrinsic has been broadcast to the given peers. @@ -39,30 +39,39 @@ pub enum Status<H> { /// /// Represents a stream of status updates for particular extrinsic. #[derive(Debug)] -pub struct Watcher<H> { - receiver: mpsc::UnboundedReceiver<Status<H>>, +pub struct Watcher<H, H2> { + receiver: mpsc::UnboundedReceiver<Status<H, H2>>, } -impl<H> Watcher<H> { +impl<H, H2> Watcher<H, H2> { /// Pipe the notifications to given sink. /// /// Make sure to drive the future to completion. - pub fn into_stream(self) -> impl Stream<Item=Status<H>, Error=()> { + pub fn into_stream(self) -> impl Stream<Item=Status<H, H2>, Error=()> { // we can safely ignore the error here, `UnboundedReceiver` never fails. self.receiver.map_err(|_| ()) } } /// Sender part of the watcher. Exposed only for testing purposes. -#[derive(Debug, Default)] -pub struct Sender<H> { - receivers: Vec<mpsc::UnboundedSender<Status<H>>>, +#[derive(Debug)] +pub struct Sender<H, H2> { + receivers: Vec<mpsc::UnboundedSender<Status<H, H2>>>, finalised: bool, } -impl<H: Clone> Sender<H> { +impl<H, H2> Default for Sender<H, H2> { + fn default() -> Self { + Sender { + receivers: Default::default(), + finalised: Default::default(), + } + } +} + +impl<H: Clone, H2: Clone> Sender<H, H2> { /// Add a new watcher to this sender object. - pub fn new_watcher(&mut self) -> Watcher<H> { + pub fn new_watcher(&mut self) -> Watcher<H, H2> { let (tx, receiver) = mpsc::unbounded(); self.receivers.push(tx); Watcher { @@ -76,7 +85,7 @@ impl<H: Clone> Sender<H> { } /// Extrinsic has been finalised in block with given hash. - pub fn finalised(&mut self, hash: H) { + pub fn finalised(&mut self, hash: H2) { self.send(Status::Finalised(hash)); self.finalised = true; } @@ -97,7 +106,7 @@ impl<H: Clone> Sender<H> { self.finalised || self.receivers.is_empty() } - fn send(&mut self, status: Status<H>) { + fn send(&mut self, status: Status<H, H2>) { self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok()) } } diff --git a/substrate/node/transaction-pool/src/lib.rs b/substrate/node/transaction-pool/src/lib.rs index b1732d60a29df0249e163e5c98399ced369308b4..bd6f79dc2f02bf5062076232533e5da3fee6a1a5 100644 --- a/substrate/node/transaction-pool/src/lib.rs +++ b/substrate/node/transaction-pool/src/lib.rs @@ -185,6 +185,10 @@ impl<C: Client> transaction_pool::ChainApi for ChainApi<C> { type Score = u64; type Event = (); + fn latest_hash(&self) -> <C::Block as BlockT>::Hash { + self.api.block_number_to_hash(self.api.current_height()).expect("Latest block number always has a hash; qed") + } + fn verify_transaction(&self, _at: &BlockId<Self::Block>, xt: &ExtrinsicFor<Self>) -> Result<Self::VEx> { let encoded = xt.encode(); let uxt = UncheckedExtrinsic::decode(&mut encoded.as_slice()).ok_or_else(|| ErrorKind::InvalidExtrinsicFormat)?;