diff --git a/substrate/core/rpc/src/author/mod.rs b/substrate/core/rpc/src/author/mod.rs index 6c9faf1493cfdd237517c5ed300d438cafd1dc82..ec1c3950ac2df7245b1f15fa878759532e9a121f 100644 --- a/substrate/core/rpc/src/author/mod.rs +++ b/substrate/core/rpc/src/author/mod.rs @@ -36,7 +36,6 @@ use primitives::{Bytes, Blake2Hasher}; use rpc::futures::{Sink, Stream, Future}; use runtime_primitives::{generic, traits}; use subscriptions::Subscriptions; -use tokio::runtime::TaskExecutor; pub mod error; @@ -60,7 +59,7 @@ build_rpc_trait! { /// Returns all pending extrinsics, potentially grouped by sender. #[rpc(name = "author_pendingExtrinsics")] fn pending_extrinsics(&self) -> Result<PendingExtrinsics>; - + #[pubsub(name = "author_extrinsicUpdate")] { /// Submit an extrinsic to watch. #[rpc(name = "author_submitAndWatchExtrinsic")] @@ -90,11 +89,15 @@ impl<B, E, P> Author<B, E, P> where P: PoolChainApi + Sync + Send + 'static, { /// Create new instance of Authoring API. - pub fn new(client: Arc<Client<B, E, <P as PoolChainApi>::Block>>, pool: Arc<Pool<P>>, executor: TaskExecutor) -> Self { + pub fn new( + client: Arc<Client<B, E, <P as PoolChainApi>::Block>>, + pool: Arc<Pool<P>>, + subscriptions: Subscriptions, + ) -> Self { Author { client, pool, - subscriptions: Subscriptions::new(executor), + subscriptions, } } } diff --git a/substrate/core/rpc/src/author/tests.rs b/substrate/core/rpc/src/author/tests.rs index be15b2341e8c82823762c654a239a049abc7231f..4ea3ead4d1cd616eec246bcb0568dff386c2b54e 100644 --- a/substrate/core/rpc/src/author/tests.rs +++ b/substrate/core/rpc/src/author/tests.rs @@ -28,7 +28,7 @@ use runtime_primitives::generic::BlockId; #[derive(Clone, Debug)] pub struct Verified { - sender: u64, + sender: u64, hash: u64, } @@ -57,13 +57,13 @@ impl ChainApi for TestApi { Ok(Verified { sender: uxt.transfer.from[31] as u64, hash: uxt.transfer.nonce, - }) + }) } fn is_ready(&self, _at: &BlockId<Block>, _c: &mut Self::Ready, _xt: &VerifiedFor<Self>) -> Readiness { Readiness::Ready } - + fn ready(&self) -> Self::Ready { } fn compare(old: &VerifiedFor<Self>, other: &VerifiedFor<Self>) -> ::std::cmp::Ordering { @@ -151,12 +151,12 @@ fn should_watch_extrinsic() { p.watch_extrinsic(Default::default(), subscriber, uxt(5, 5).encode().into()); // then - assert_eq!(runtime.block_on(id_rx), Ok(Ok(0.into()))); + assert_eq!(runtime.block_on(id_rx), Ok(Ok(1.into()))); // check notifications AuthorApi::submit_rich_extrinsic(&p, uxt(5, 1)).unwrap(); assert_eq!( runtime.block_on(data.into_future()).unwrap().0, - Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":1},"subscription":0}}"#.into()) + Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":1},"subscription":1}}"#.into()) ); } diff --git a/substrate/core/rpc/src/chain/mod.rs b/substrate/core/rpc/src/chain/mod.rs index 07109fee5f24e10d339dad6a2ecdf072638658d2..75b6ed000fac91bca88962e375bce7eef3a77813 100644 --- a/substrate/core/rpc/src/chain/mod.rs +++ b/substrate/core/rpc/src/chain/mod.rs @@ -26,7 +26,6 @@ use rpc::futures::{stream, Future, Sink, Stream}; use runtime_primitives::generic::{BlockId, SignedBlock}; use runtime_primitives::traits::{Block as BlockT, Header, NumberFor}; use runtime_version::RuntimeVersion; -use tokio::runtime::TaskExecutor; use primitives::{Blake2Hasher}; use subscriptions::Subscriptions; @@ -82,10 +81,10 @@ pub struct Chain<B, E, Block: BlockT> { impl<B, E, Block: BlockT> Chain<B, E, Block> { /// Create new Chain API RPC handler. - pub fn new(client: Arc<Client<B, E, Block>>, executor: TaskExecutor) -> Self { + pub fn new(client: Arc<Client<B, E, Block>>, subscriptions: Subscriptions) -> Self { Self { client, - subscriptions: Subscriptions::new(executor), + subscriptions, } } } diff --git a/substrate/core/rpc/src/chain/tests.rs b/substrate/core/rpc/src/chain/tests.rs index 532d988d3ba4a75eec900a4122a40a5c6f0b1ea7..1c0cfbcb1a8d2e5e486f81c12f66b5d13c4cf112 100644 --- a/substrate/core/rpc/src/chain/tests.rs +++ b/substrate/core/rpc/src/chain/tests.rs @@ -168,7 +168,7 @@ fn should_notify_about_latest_block() { api.subscribe_new_head(Default::default(), subscriber); // assert id assigned - assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0)))); + assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1)))); let builder = api.client.new_block().unwrap(); api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); diff --git a/substrate/core/rpc/src/lib.rs b/substrate/core/rpc/src/lib.rs index cad9f80d8b2e8aae340eb200c39715dae146a405..8bda6050ea6e8edadaf4b4d7a035cab05e19b6ee 100644 --- a/substrate/core/rpc/src/lib.rs +++ b/substrate/core/rpc/src/lib.rs @@ -50,6 +50,8 @@ mod errors; mod helpers; mod subscriptions; +pub use subscriptions::Subscriptions; + pub mod author; pub mod chain; pub mod metadata; diff --git a/substrate/core/rpc/src/state/mod.rs b/substrate/core/rpc/src/state/mod.rs index 47b6647bd26130dec7216b25a8fe10bbada5f871..af95dcb3a4035b587fb3c59ba584704e116ad03d 100644 --- a/substrate/core/rpc/src/state/mod.rs +++ b/substrate/core/rpc/src/state/mod.rs @@ -32,7 +32,6 @@ use rpc::Result as RpcResult; use rpc::futures::{stream, Future, Sink, Stream}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{Block as BlockT, Header}; -use tokio::runtime::TaskExecutor; use subscriptions::Subscriptions; @@ -96,10 +95,10 @@ pub struct State<B, E, Block: BlockT> { impl<B, E, Block: BlockT> State<B, E, Block> { /// Create new State API RPC handler. - pub fn new(client: Arc<Client<B, E, Block>>, executor: TaskExecutor) -> Self { + pub fn new(client: Arc<Client<B, E, Block>>, subscriptions: Subscriptions) -> Self { Self { client, - subscriptions: Subscriptions::new(executor), + subscriptions, } } } diff --git a/substrate/core/rpc/src/state/tests.rs b/substrate/core/rpc/src/state/tests.rs index 8c401ec1bbf84f70f5c9f2ddfffa207ad1215e33..d90933eb7353f8320f632ea52bd57de22bcac245 100644 --- a/substrate/core/rpc/src/state/tests.rs +++ b/substrate/core/rpc/src/state/tests.rs @@ -27,7 +27,7 @@ fn should_return_storage() { let core = ::tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let genesis_hash = client.genesis_hash(); - let client = State::new(client, core.executor()); + let client = State::new(client, Subscriptions::new(core.executor())); assert_matches!( client.storage(StorageKey(vec![10]), Some(genesis_hash).into()), @@ -40,7 +40,7 @@ fn should_call_contract() { let core = ::tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let genesis_hash = client.genesis_hash(); - let client = State::new(client, core.executor()); + let client = State::new(client, Subscriptions::new(core.executor())); assert_matches!( client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()), @@ -55,15 +55,12 @@ fn should_notify_about_storage_changes() { let (subscriber, id, transport) = pubsub::Subscriber::new_test("test"); { - let api = State { - client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(remote), - }; + let api = State::new(Arc::new(test_client::new()), Subscriptions::new(remote)); api.subscribe_storage(Default::default(), subscriber, None.into()); // assert id assigned - assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0)))); + assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1)))); let mut builder = api.client.new_block().unwrap(); builder.push_transfer(runtime::Transfer { @@ -89,17 +86,14 @@ fn should_send_initial_storage_changes_and_notifications() { let (subscriber, id, transport) = pubsub::Subscriber::new_test("test"); { - let api = State { - client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(remote), - }; + let api = State::new(Arc::new(test_client::new()), Subscriptions::new(remote)); api.subscribe_storage(Default::default(), subscriber, Some(vec![ StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), ]).into()); // assert id assigned - assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0)))); + assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1)))); let mut builder = api.client.new_block().unwrap(); builder.push_transfer(runtime::Transfer { @@ -125,7 +119,7 @@ fn should_send_initial_storage_changes_and_notifications() { fn should_query_storage() { let core = ::tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); - let api = State::new(client.clone(), core.executor()); + let api = State::new(client.clone(), Subscriptions::new(core.executor())); let add_block = |nonce| { let mut builder = client.new_block().unwrap(); diff --git a/substrate/core/rpc/src/subscriptions.rs b/substrate/core/rpc/src/subscriptions.rs index 9013edf742e1c333007f5a85b997063a15e0efed..17ca51b3504b598575963a41e36b613927e48f3c 100644 --- a/substrate/core/rpc/src/subscriptions.rs +++ b/substrate/core/rpc/src/subscriptions.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see <http://www.gnu.org/licenses/>. use std::collections::HashMap; -use std::sync::atomic::{self, AtomicUsize}; +use std::sync::{Arc, atomic::{self, AtomicUsize}}; use jsonrpc_macros::pubsub; use jsonrpc_pubsub::SubscriptionId; @@ -26,14 +26,34 @@ use tokio::runtime::TaskExecutor; type Id = u64; +/// Generate unique ids for subscriptions. +#[derive(Clone, Debug)] +pub struct IdProvider { + next_id: Arc<AtomicUsize>, +} +impl Default for IdProvider { + fn default() -> Self { + IdProvider { + next_id: Arc::new(AtomicUsize::new(1)), + } + } +} + +impl IdProvider { + /// Returns next id for the subscription. + pub fn next_id(&self) -> Id { + self.next_id.fetch_add(1, atomic::Ordering::AcqRel) as u64 + } +} + /// Subscriptions manager. /// /// Takes care of assigning unique subscription ids and /// driving the sinks into completion. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Subscriptions { - next_id: AtomicUsize, - active_subscriptions: Mutex<HashMap<Id, oneshot::Sender<()>>>, + next_id: IdProvider, + active_subscriptions: Arc<Mutex<HashMap<Id, oneshot::Sender<()>>>>, executor: TaskExecutor, } @@ -57,7 +77,7 @@ impl Subscriptions { R: future::IntoFuture<Future=F, Item=(), Error=()>, F: future::Future<Item=(), Error=()> + Send + 'static, { - let id = self.next_id.fetch_add(1, atomic::Ordering::AcqRel) as u64; + let id = self.next_id.next_id(); if let Ok(sink) = subscriber.assign_id(id.into()) { let (tx, rx) = oneshot::channel(); let future = into_future(sink) diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 0fb40d9a2cb212e664c787c06235865e13e5d117..20994830389374d21947caae3a883e0771087f01 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -232,9 +232,10 @@ impl<Components> Service<Components> let (rpc_http, rpc_ws) = { let handler = || { let client = client.clone(); - let chain = rpc::apis::chain::Chain::new(client.clone(), task_executor.clone()); - let state = rpc::apis::state::State::new(client.clone(), task_executor.clone()); - let author = rpc::apis::author::Author::new(client.clone(), transaction_pool.clone(), task_executor.clone()); + let subscriptions = rpc::apis::Subscriptions::new(task_executor.clone()); + let chain = rpc::apis::chain::Chain::new(client.clone(), subscriptions.clone()); + let state = rpc::apis::state::State::new(client.clone(), subscriptions.clone()); + let author = rpc::apis::author::Author::new(client.clone(), transaction_pool.clone(), subscriptions.clone()); rpc::rpc_handler::<ComponentBlock<Components>, ComponentExHash<Components>, _, _, _, _, _>( state, chain, diff --git a/substrate/subkey/src/vanity.rs b/substrate/subkey/src/vanity.rs index 0d6d43065d1f08ef96b46844a5db02c1b21e41ec..a3c0dab37d3c26ce79a8e187089fa8651514d87e 100644 --- a/substrate/subkey/src/vanity.rs +++ b/substrate/subkey/src/vanity.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see <http://www.gnu.org/licenses/>. use rand::{OsRng, Rng}; -use substrate_primitives::{ed25519::Pair, hexdisplay::HexDisplay}; +use substrate_primitives::ed25519::Pair; use std::cmp; fn good_waypoint(done: u64) -> u64 {