From e366b5bd2cc58adbacd5da0dc31cada8549d6704 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jakub=20P=C3=A1nik?= <jakub.panik@gmail.com>
Date: Wed, 4 Mar 2020 14:19:38 +0100
Subject: [PATCH] Added subscribe_all_heads RPC function (#4979)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Added subscribe_all_heads RPC function

* Update client/rpc/src/chain/tests.rs

Fixed style ( spacing )

Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
---
 substrate/client/rpc-api/src/chain/mod.rs | 16 +++++++--
 substrate/client/rpc/src/chain/mod.rs     | 42 ++++++++++++++++++++---
 substrate/client/rpc/src/chain/tests.rs   | 29 ++++++++++++++++
 3 files changed, 81 insertions(+), 6 deletions(-)

diff --git a/substrate/client/rpc-api/src/chain/mod.rs b/substrate/client/rpc-api/src/chain/mod.rs
index 0c270a3f705..2ab3851d376 100644
--- a/substrate/client/rpc-api/src/chain/mod.rs
+++ b/substrate/client/rpc-api/src/chain/mod.rs
@@ -54,6 +54,18 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> {
 	#[rpc(name = "chain_getFinalizedHead", alias("chain_getFinalisedHead"))]
 	fn finalized_head(&self) -> Result<Hash>;
 
+	/// All head subscription
+	#[pubsub(subscription = "chain_allHead", subscribe, name = "chain_subscribeAllHeads")]
+	fn subscribe_all_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Header>);
+
+	/// Unsubscribe from all head subscription.
+	#[pubsub(subscription = "chain_allHead", unsubscribe, name = "chain_unsubscribeAllHeads")]
+	fn unsubscribe_all_heads(
+		&self,
+		metadata: Option<Self::Metadata>,
+		id: SubscriptionId,
+	) -> RpcResult<bool>;
+
 	/// New head subscription
 	#[pubsub(
 		subscription = "chain_newHead",
@@ -76,7 +88,7 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> {
 		id: SubscriptionId,
 	) -> RpcResult<bool>;
 
-	/// New head subscription
+	/// Finalized head subscription
 	#[pubsub(
 		subscription = "chain_finalizedHead",
 		subscribe,
@@ -85,7 +97,7 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> {
 	)]
 	fn subscribe_finalized_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Header>);
 
-	/// Unsubscribe from new head subscription.
+	/// Unsubscribe from finalized head subscription.
 	#[pubsub(
 		subscription = "chain_finalizedHead",
 		unsubscribe,
diff --git a/substrate/client/rpc/src/chain/mod.rs b/substrate/client/rpc/src/chain/mod.rs
index a2971983c79..5285de670d8 100644
--- a/substrate/client/rpc/src/chain/mod.rs
+++ b/substrate/client/rpc/src/chain/mod.rs
@@ -94,7 +94,33 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static
 		Ok(self.client().chain_info().finalized_hash)
 	}
 
-	/// New head subscription
+	/// All new head subscription
+	fn subscribe_all_heads(
+		&self,
+		_metadata: crate::metadata::Metadata,
+		subscriber: Subscriber<Block::Header>,
+	) {
+		subscribe_headers(
+			self.client(),
+			self.subscriptions(),
+			subscriber,
+			|| self.client().chain_info().best_hash,
+			|| self.client().import_notification_stream()
+				.map(|notification| Ok::<_, ()>(notification.header))
+				.compat(),
+		)
+	}
+
+	/// Unsubscribe from all head subscription.
+	fn unsubscribe_all_heads(
+		&self,
+		_metadata: Option<crate::metadata::Metadata>,
+		id: SubscriptionId,
+	) -> RpcResult<bool> {
+		Ok(self.subscriptions().cancel(id))
+	}
+
+	/// New best head subscription
 	fn subscribe_new_heads(
 		&self,
 		_metadata: crate::metadata::Metadata,
@@ -112,7 +138,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static
 		)
 	}
 
-	/// Unsubscribe from new head subscription.
+	/// Unsubscribe from new best head subscription.
 	fn unsubscribe_new_heads(
 		&self,
 		_metadata: Option<crate::metadata::Metadata>,
@@ -121,7 +147,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static
 		Ok(self.subscriptions().cancel(id))
 	}
 
-	/// New head subscription
+	/// Finalized head subscription
 	fn subscribe_finalized_heads(
 		&self,
 		_metadata: crate::metadata::Metadata,
@@ -138,7 +164,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static
 		)
 	}
 
-	/// Unsubscribe from new head subscription.
+	/// Unsubscribe from finalized head subscription.
 	fn unsubscribe_finalized_heads(
 		&self,
 		_metadata: Option<crate::metadata::Metadata>,
@@ -229,6 +255,14 @@ impl<B, E, Block, RA> ChainApi<NumberFor<Block>, Block::Hash, Block::Header, Sig
 		self.backend.finalized_head()
 	}
 
+	fn subscribe_all_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Block::Header>) {
+		self.backend.subscribe_all_heads(metadata, subscriber)
+	}
+
+	fn unsubscribe_all_heads(&self, metadata: Option<Self::Metadata>, id: SubscriptionId) -> RpcResult<bool> {
+		self.backend.unsubscribe_all_heads(metadata, id)
+	}
+
 	fn subscribe_new_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Block::Header>) {
 		self.backend.subscribe_new_heads(metadata, subscriber)
 	}
diff --git a/substrate/client/rpc/src/chain/tests.rs b/substrate/client/rpc/src/chain/tests.rs
index eb056390187..02e4d2f1633 100644
--- a/substrate/client/rpc/src/chain/tests.rs
+++ b/substrate/client/rpc/src/chain/tests.rs
@@ -195,6 +195,35 @@ fn should_notify_about_latest_block() {
 	let remote = core.executor();
 	let (subscriber, id, transport) = Subscriber::new_test("test");
 
+	{
+		let mut client = Arc::new(substrate_test_runtime_client::new());
+		let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
+
+		api.subscribe_all_heads(Default::default(), subscriber);
+
+		// assert id assigned
+		assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
+
+		let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
+		client.import(BlockOrigin::Own, block).unwrap();
+	}
+
+	// assert initial head sent.
+	let (notification, next) = core.block_on(transport.into_future()).unwrap();
+	assert!(notification.is_some());
+	// assert notification sent to transport
+	let (notification, next) = core.block_on(next.into_future()).unwrap();
+	assert!(notification.is_some());
+	// no more notifications on this channel
+	assert_eq!(core.block_on(next.into_future()).unwrap().0, None);
+}
+
+#[test]
+fn should_notify_about_best_block() {
+	let mut core = ::tokio::runtime::Runtime::new().unwrap();
+	let remote = core.executor();
+	let (subscriber, id, transport) = Subscriber::new_test("test");
+
 	{
 		let mut client = Arc::new(substrate_test_runtime_client::new());
 		let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
-- 
GitLab