diff --git a/substrate/client/rpc-api/src/chain/mod.rs b/substrate/client/rpc-api/src/chain/mod.rs index 0c270a3f70528e4b772a639cca98f395a85b8e2b..2ab3851d37663a328e37b4f1d8998d103905d9f3 100644 --- a/substrate/client/rpc-api/src/chain/mod.rs +++ b/substrate/client/rpc-api/src/chain/mod.rs @@ -54,6 +54,18 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> { #[rpc(name = "chain_getFinalizedHead", alias("chain_getFinalisedHead"))] fn finalized_head(&self) -> Result<Hash>; + /// All head subscription + #[pubsub(subscription = "chain_allHead", subscribe, name = "chain_subscribeAllHeads")] + fn subscribe_all_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Header>); + + /// Unsubscribe from all head subscription. + #[pubsub(subscription = "chain_allHead", unsubscribe, name = "chain_unsubscribeAllHeads")] + fn unsubscribe_all_heads( + &self, + metadata: Option<Self::Metadata>, + id: SubscriptionId, + ) -> RpcResult<bool>; + /// New head subscription #[pubsub( subscription = "chain_newHead", @@ -76,7 +88,7 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> { id: SubscriptionId, ) -> RpcResult<bool>; - /// New head subscription + /// Finalized head subscription #[pubsub( subscription = "chain_finalizedHead", subscribe, @@ -85,7 +97,7 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> { )] fn subscribe_finalized_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Header>); - /// Unsubscribe from new head subscription. + /// Unsubscribe from finalized head subscription. #[pubsub( subscription = "chain_finalizedHead", unsubscribe, diff --git a/substrate/client/rpc/src/chain/mod.rs b/substrate/client/rpc/src/chain/mod.rs index a2971983c793f18f9858b53211f0525cd977fd87..5285de670d8abb741b66626c17dec3b0c48f1147 100644 --- a/substrate/client/rpc/src/chain/mod.rs +++ b/substrate/client/rpc/src/chain/mod.rs @@ -94,7 +94,33 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static Ok(self.client().chain_info().finalized_hash) } - /// New head subscription + /// All new head subscription + fn subscribe_all_heads( + &self, + _metadata: crate::metadata::Metadata, + subscriber: Subscriber<Block::Header>, + ) { + subscribe_headers( + self.client(), + self.subscriptions(), + subscriber, + || self.client().chain_info().best_hash, + || self.client().import_notification_stream() + .map(|notification| Ok::<_, ()>(notification.header)) + .compat(), + ) + } + + /// Unsubscribe from all head subscription. + fn unsubscribe_all_heads( + &self, + _metadata: Option<crate::metadata::Metadata>, + id: SubscriptionId, + ) -> RpcResult<bool> { + Ok(self.subscriptions().cancel(id)) + } + + /// New best head subscription fn subscribe_new_heads( &self, _metadata: crate::metadata::Metadata, @@ -112,7 +138,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static ) } - /// Unsubscribe from new head subscription. + /// Unsubscribe from new best head subscription. fn unsubscribe_new_heads( &self, _metadata: Option<crate::metadata::Metadata>, @@ -121,7 +147,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static Ok(self.subscriptions().cancel(id)) } - /// New head subscription + /// Finalized head subscription fn subscribe_finalized_heads( &self, _metadata: crate::metadata::Metadata, @@ -138,7 +164,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static ) } - /// Unsubscribe from new head subscription. + /// Unsubscribe from finalized head subscription. fn unsubscribe_finalized_heads( &self, _metadata: Option<crate::metadata::Metadata>, @@ -229,6 +255,14 @@ impl<B, E, Block, RA> ChainApi<NumberFor<Block>, Block::Hash, Block::Header, Sig self.backend.finalized_head() } + fn subscribe_all_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Block::Header>) { + self.backend.subscribe_all_heads(metadata, subscriber) + } + + fn unsubscribe_all_heads(&self, metadata: Option<Self::Metadata>, id: SubscriptionId) -> RpcResult<bool> { + self.backend.unsubscribe_all_heads(metadata, id) + } + fn subscribe_new_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Block::Header>) { self.backend.subscribe_new_heads(metadata, subscriber) } diff --git a/substrate/client/rpc/src/chain/tests.rs b/substrate/client/rpc/src/chain/tests.rs index eb05639018781d3bd7a320d92219f48a30372877..02e4d2f16337bad25bf2ff29629ce2c3a876c612 100644 --- a/substrate/client/rpc/src/chain/tests.rs +++ b/substrate/client/rpc/src/chain/tests.rs @@ -195,6 +195,35 @@ fn should_notify_about_latest_block() { let remote = core.executor(); let (subscriber, id, transport) = Subscriber::new_test("test"); + { + let mut client = Arc::new(substrate_test_runtime_client::new()); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote))); + + api.subscribe_all_heads(Default::default(), subscriber); + + // assert id assigned + assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1)))); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block).unwrap(); + } + + // assert initial head sent. + let (notification, next) = core.block_on(transport.into_future()).unwrap(); + assert!(notification.is_some()); + // assert notification sent to transport + let (notification, next) = core.block_on(next.into_future()).unwrap(); + assert!(notification.is_some()); + // no more notifications on this channel + assert_eq!(core.block_on(next.into_future()).unwrap().0, None); +} + +#[test] +fn should_notify_about_best_block() { + let mut core = ::tokio::runtime::Runtime::new().unwrap(); + let remote = core.executor(); + let (subscriber, id, transport) = Subscriber::new_test("test"); + { let mut client = Arc::new(substrate_test_runtime_client::new()); let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));