From e8d88fcb7b172f98bc081d7afe3bea41fab24037 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= <tomusdrw@users.noreply.github.com>
Date: Mon, 1 Oct 2018 17:31:52 +0200
Subject: [PATCH] Global unique subscription ids. (#848)

* Global unique subscription ids.

* Fix tests.
---
 substrate/core/rpc/src/author/mod.rs    | 11 +++++----
 substrate/core/rpc/src/author/tests.rs  | 10 ++++-----
 substrate/core/rpc/src/chain/mod.rs     |  5 ++---
 substrate/core/rpc/src/chain/tests.rs   |  2 +-
 substrate/core/rpc/src/lib.rs           |  2 ++
 substrate/core/rpc/src/state/mod.rs     |  5 ++---
 substrate/core/rpc/src/state/tests.rs   | 20 ++++++-----------
 substrate/core/rpc/src/subscriptions.rs | 30 ++++++++++++++++++++-----
 substrate/core/service/src/lib.rs       |  7 +++---
 substrate/subkey/src/vanity.rs          |  2 +-
 10 files changed, 56 insertions(+), 38 deletions(-)

diff --git a/substrate/core/rpc/src/author/mod.rs b/substrate/core/rpc/src/author/mod.rs
index 6c9faf1493c..ec1c3950ac2 100644
--- a/substrate/core/rpc/src/author/mod.rs
+++ b/substrate/core/rpc/src/author/mod.rs
@@ -36,7 +36,6 @@ use primitives::{Bytes, Blake2Hasher};
 use rpc::futures::{Sink, Stream, Future};
 use runtime_primitives::{generic, traits};
 use subscriptions::Subscriptions;
-use tokio::runtime::TaskExecutor;
 
 pub mod error;
 
@@ -60,7 +59,7 @@ build_rpc_trait! {
 		/// Returns all pending extrinsics, potentially grouped by sender.
 		#[rpc(name = "author_pendingExtrinsics")]
 		fn pending_extrinsics(&self) -> Result<PendingExtrinsics>;
-	
+
 		#[pubsub(name = "author_extrinsicUpdate")] {
 			/// Submit an extrinsic to watch.
 			#[rpc(name = "author_submitAndWatchExtrinsic")]
@@ -90,11 +89,15 @@ impl<B, E, P> Author<B, E, P> where
 	P: PoolChainApi + Sync + Send + 'static,
 {
 	/// Create new instance of Authoring API.
-	pub fn new(client: Arc<Client<B, E, <P as PoolChainApi>::Block>>, pool: Arc<Pool<P>>, executor: TaskExecutor) -> Self {
+	pub fn new(
+		client: Arc<Client<B, E, <P as PoolChainApi>::Block>>,
+		pool: Arc<Pool<P>>,
+		subscriptions: Subscriptions,
+	) -> Self {
 		Author {
 			client,
 			pool,
-			subscriptions: Subscriptions::new(executor),
+			subscriptions,
 		}
 	}
 }
diff --git a/substrate/core/rpc/src/author/tests.rs b/substrate/core/rpc/src/author/tests.rs
index be15b2341e8..4ea3ead4d1c 100644
--- a/substrate/core/rpc/src/author/tests.rs
+++ b/substrate/core/rpc/src/author/tests.rs
@@ -28,7 +28,7 @@ use runtime_primitives::generic::BlockId;
 #[derive(Clone, Debug)]
 pub struct Verified
 {
-	sender: u64, 
+	sender: u64,
 	hash: u64,
 }
 
@@ -57,13 +57,13 @@ impl ChainApi for TestApi {
 		Ok(Verified {
 			sender: uxt.transfer.from[31] as u64,
 			hash: uxt.transfer.nonce,
-		}) 
+		})
 	}
 
 	fn is_ready(&self, _at: &BlockId<Block>, _c: &mut Self::Ready, _xt: &VerifiedFor<Self>) -> Readiness {
 		Readiness::Ready
 	}
-	
+
 	fn ready(&self) -> Self::Ready { }
 
 	fn compare(old: &VerifiedFor<Self>, other: &VerifiedFor<Self>) -> ::std::cmp::Ordering {
@@ -151,12 +151,12 @@ fn should_watch_extrinsic() {
 	p.watch_extrinsic(Default::default(), subscriber, uxt(5, 5).encode().into());
 
 	// then
-	assert_eq!(runtime.block_on(id_rx), Ok(Ok(0.into())));
+	assert_eq!(runtime.block_on(id_rx), Ok(Ok(1.into())));
 	// check notifications
 	AuthorApi::submit_rich_extrinsic(&p, uxt(5, 1)).unwrap();
 	assert_eq!(
 		runtime.block_on(data.into_future()).unwrap().0,
-		Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":1},"subscription":0}}"#.into())
+		Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":1},"subscription":1}}"#.into())
 	);
 }
 
diff --git a/substrate/core/rpc/src/chain/mod.rs b/substrate/core/rpc/src/chain/mod.rs
index 07109fee5f2..75b6ed000fa 100644
--- a/substrate/core/rpc/src/chain/mod.rs
+++ b/substrate/core/rpc/src/chain/mod.rs
@@ -26,7 +26,6 @@ use rpc::futures::{stream, Future, Sink, Stream};
 use runtime_primitives::generic::{BlockId, SignedBlock};
 use runtime_primitives::traits::{Block as BlockT, Header, NumberFor};
 use runtime_version::RuntimeVersion;
-use tokio::runtime::TaskExecutor;
 use primitives::{Blake2Hasher};
 
 use subscriptions::Subscriptions;
@@ -82,10 +81,10 @@ pub struct Chain<B, E, Block: BlockT> {
 
 impl<B, E, Block: BlockT> Chain<B, E, Block> {
 	/// Create new Chain API RPC handler.
-	pub fn new(client: Arc<Client<B, E, Block>>, executor: TaskExecutor) -> Self {
+	pub fn new(client: Arc<Client<B, E, Block>>, subscriptions: Subscriptions) -> Self {
 		Self {
 			client,
-			subscriptions: Subscriptions::new(executor),
+			subscriptions,
 		}
 	}
 }
diff --git a/substrate/core/rpc/src/chain/tests.rs b/substrate/core/rpc/src/chain/tests.rs
index 532d988d3ba..1c0cfbcb1a8 100644
--- a/substrate/core/rpc/src/chain/tests.rs
+++ b/substrate/core/rpc/src/chain/tests.rs
@@ -168,7 +168,7 @@ fn should_notify_about_latest_block() {
 		api.subscribe_new_head(Default::default(), subscriber);
 
 		// assert id assigned
-		assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0))));
+		assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
 
 		let builder = api.client.new_block().unwrap();
 		api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap();
diff --git a/substrate/core/rpc/src/lib.rs b/substrate/core/rpc/src/lib.rs
index cad9f80d8b2..8bda6050ea6 100644
--- a/substrate/core/rpc/src/lib.rs
+++ b/substrate/core/rpc/src/lib.rs
@@ -50,6 +50,8 @@ mod errors;
 mod helpers;
 mod subscriptions;
 
+pub use subscriptions::Subscriptions;
+
 pub mod author;
 pub mod chain;
 pub mod metadata;
diff --git a/substrate/core/rpc/src/state/mod.rs b/substrate/core/rpc/src/state/mod.rs
index 47b6647bd26..af95dcb3a40 100644
--- a/substrate/core/rpc/src/state/mod.rs
+++ b/substrate/core/rpc/src/state/mod.rs
@@ -32,7 +32,6 @@ use rpc::Result as RpcResult;
 use rpc::futures::{stream, Future, Sink, Stream};
 use runtime_primitives::generic::BlockId;
 use runtime_primitives::traits::{Block as BlockT, Header};
-use tokio::runtime::TaskExecutor;
 
 use subscriptions::Subscriptions;
 
@@ -96,10 +95,10 @@ pub struct State<B, E, Block: BlockT> {
 
 impl<B, E, Block: BlockT> State<B, E, Block> {
 	/// Create new State API RPC handler.
-	pub fn new(client: Arc<Client<B, E, Block>>, executor: TaskExecutor) -> Self {
+	pub fn new(client: Arc<Client<B, E, Block>>, subscriptions: Subscriptions) -> Self {
 		Self {
 			client,
-			subscriptions: Subscriptions::new(executor),
+			subscriptions,
 		}
 	}
 }
diff --git a/substrate/core/rpc/src/state/tests.rs b/substrate/core/rpc/src/state/tests.rs
index 8c401ec1bbf..d90933eb735 100644
--- a/substrate/core/rpc/src/state/tests.rs
+++ b/substrate/core/rpc/src/state/tests.rs
@@ -27,7 +27,7 @@ fn should_return_storage() {
 	let core = ::tokio::runtime::Runtime::new().unwrap();
 	let client = Arc::new(test_client::new());
 	let genesis_hash = client.genesis_hash();
-	let client = State::new(client, core.executor());
+	let client = State::new(client, Subscriptions::new(core.executor()));
 
 	assert_matches!(
 		client.storage(StorageKey(vec![10]), Some(genesis_hash).into()),
@@ -40,7 +40,7 @@ fn should_call_contract() {
 	let core = ::tokio::runtime::Runtime::new().unwrap();
 	let client = Arc::new(test_client::new());
 	let genesis_hash = client.genesis_hash();
-	let client = State::new(client, core.executor());
+	let client = State::new(client, Subscriptions::new(core.executor()));
 
 	assert_matches!(
 		client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()),
@@ -55,15 +55,12 @@ fn should_notify_about_storage_changes() {
 	let (subscriber, id, transport) = pubsub::Subscriber::new_test("test");
 
 	{
-		let api = State {
-			client: Arc::new(test_client::new()),
-			subscriptions: Subscriptions::new(remote),
-		};
+		let api = State::new(Arc::new(test_client::new()), Subscriptions::new(remote));
 
 		api.subscribe_storage(Default::default(), subscriber, None.into());
 
 		// assert id assigned
-		assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0))));
+		assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
 
 		let mut builder = api.client.new_block().unwrap();
 		builder.push_transfer(runtime::Transfer {
@@ -89,17 +86,14 @@ fn should_send_initial_storage_changes_and_notifications() {
 	let (subscriber, id, transport) = pubsub::Subscriber::new_test("test");
 
 	{
-		let api = State {
-			client: Arc::new(test_client::new()),
-			subscriptions: Subscriptions::new(remote),
-		};
+		let api = State::new(Arc::new(test_client::new()), Subscriptions::new(remote));
 
 		api.subscribe_storage(Default::default(), subscriber, Some(vec![
 			StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()),
 		]).into());
 
 		// assert id assigned
-		assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0))));
+		assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
 
 		let mut builder = api.client.new_block().unwrap();
 		builder.push_transfer(runtime::Transfer {
@@ -125,7 +119,7 @@ fn should_send_initial_storage_changes_and_notifications() {
 fn should_query_storage() {
 	let core = ::tokio::runtime::Runtime::new().unwrap();
 	let client = Arc::new(test_client::new());
-	let api = State::new(client.clone(), core.executor());
+	let api = State::new(client.clone(), Subscriptions::new(core.executor()));
 
 	let add_block = |nonce| {
 		let mut builder = client.new_block().unwrap();
diff --git a/substrate/core/rpc/src/subscriptions.rs b/substrate/core/rpc/src/subscriptions.rs
index 9013edf742e..17ca51b3504 100644
--- a/substrate/core/rpc/src/subscriptions.rs
+++ b/substrate/core/rpc/src/subscriptions.rs
@@ -15,7 +15,7 @@
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
 use std::collections::HashMap;
-use std::sync::atomic::{self, AtomicUsize};
+use std::sync::{Arc, atomic::{self, AtomicUsize}};
 
 use jsonrpc_macros::pubsub;
 use jsonrpc_pubsub::SubscriptionId;
@@ -26,14 +26,34 @@ use tokio::runtime::TaskExecutor;
 
 type Id = u64;
 
+/// Generate unique ids for subscriptions.
+#[derive(Clone, Debug)]
+pub struct IdProvider {
+	next_id: Arc<AtomicUsize>,
+}
+impl Default for IdProvider {
+	fn default() -> Self {
+		IdProvider {
+			next_id: Arc::new(AtomicUsize::new(1)),
+		}
+	}
+}
+
+impl IdProvider {
+	/// Returns next id for the subscription.
+	pub fn next_id(&self) -> Id {
+		self.next_id.fetch_add(1, atomic::Ordering::AcqRel) as u64
+	}
+}
+
 /// Subscriptions manager.
 ///
 /// Takes care of assigning unique subscription ids and
 /// driving the sinks into completion.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct Subscriptions {
-	next_id: AtomicUsize,
-	active_subscriptions: Mutex<HashMap<Id, oneshot::Sender<()>>>,
+	next_id: IdProvider,
+	active_subscriptions: Arc<Mutex<HashMap<Id, oneshot::Sender<()>>>>,
 	executor: TaskExecutor,
 }
 
@@ -57,7 +77,7 @@ impl Subscriptions {
 		R: future::IntoFuture<Future=F, Item=(), Error=()>,
 		F: future::Future<Item=(), Error=()> + Send + 'static,
 	{
-		let id = self.next_id.fetch_add(1, atomic::Ordering::AcqRel) as u64;
+		let id = self.next_id.next_id();
 		if let Ok(sink) = subscriber.assign_id(id.into()) {
 			let (tx, rx) = oneshot::channel();
 			let future = into_future(sink)
diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs
index 0fb40d9a2cb..20994830389 100644
--- a/substrate/core/service/src/lib.rs
+++ b/substrate/core/service/src/lib.rs
@@ -232,9 +232,10 @@ impl<Components> Service<Components>
 		let (rpc_http, rpc_ws) = {
 			let handler = || {
 				let client = client.clone();
-				let chain = rpc::apis::chain::Chain::new(client.clone(), task_executor.clone());
-				let state = rpc::apis::state::State::new(client.clone(), task_executor.clone());
-				let author = rpc::apis::author::Author::new(client.clone(), transaction_pool.clone(), task_executor.clone());
+				let subscriptions = rpc::apis::Subscriptions::new(task_executor.clone());
+				let chain = rpc::apis::chain::Chain::new(client.clone(), subscriptions.clone());
+				let state = rpc::apis::state::State::new(client.clone(), subscriptions.clone());
+				let author = rpc::apis::author::Author::new(client.clone(), transaction_pool.clone(), subscriptions.clone());
 				rpc::rpc_handler::<ComponentBlock<Components>, ComponentExHash<Components>, _, _, _, _, _>(
 					state,
 					chain,
diff --git a/substrate/subkey/src/vanity.rs b/substrate/subkey/src/vanity.rs
index 0d6d43065d1..a3c0dab37d3 100644
--- a/substrate/subkey/src/vanity.rs
+++ b/substrate/subkey/src/vanity.rs
@@ -15,7 +15,7 @@
 // along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
 
 use rand::{OsRng, Rng};
-use substrate_primitives::{ed25519::Pair, hexdisplay::HexDisplay};
+use substrate_primitives::ed25519::Pair;
 use std::cmp;
 
 fn good_waypoint(done: u64) -> u64 {
-- 
GitLab