Skip to content
Snippets Groups Projects
Commit e366b5bd authored by Jakub Pánik's avatar Jakub Pánik Committed by GitHub
Browse files

Added subscribe_all_heads RPC function (#4979)


* Added subscribe_all_heads RPC function

* Update client/rpc/src/chain/tests.rs

Fixed style ( spacing )

Co-Authored-By: default avatarTomasz Drwięga <tomusdrw@users.noreply.github.com>

Co-authored-by: default avatarTomasz Drwięga <tomusdrw@users.noreply.github.com>
parent f5a79798
No related merge requests found
...@@ -54,6 +54,18 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> { ...@@ -54,6 +54,18 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> {
#[rpc(name = "chain_getFinalizedHead", alias("chain_getFinalisedHead"))] #[rpc(name = "chain_getFinalizedHead", alias("chain_getFinalisedHead"))]
fn finalized_head(&self) -> Result<Hash>; fn finalized_head(&self) -> Result<Hash>;
/// All head subscription
#[pubsub(subscription = "chain_allHead", subscribe, name = "chain_subscribeAllHeads")]
fn subscribe_all_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Header>);
/// Unsubscribe from all head subscription.
#[pubsub(subscription = "chain_allHead", unsubscribe, name = "chain_unsubscribeAllHeads")]
fn unsubscribe_all_heads(
&self,
metadata: Option<Self::Metadata>,
id: SubscriptionId,
) -> RpcResult<bool>;
/// New head subscription /// New head subscription
#[pubsub( #[pubsub(
subscription = "chain_newHead", subscription = "chain_newHead",
...@@ -76,7 +88,7 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> { ...@@ -76,7 +88,7 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> {
id: SubscriptionId, id: SubscriptionId,
) -> RpcResult<bool>; ) -> RpcResult<bool>;
/// New head subscription /// Finalized head subscription
#[pubsub( #[pubsub(
subscription = "chain_finalizedHead", subscription = "chain_finalizedHead",
subscribe, subscribe,
...@@ -85,7 +97,7 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> { ...@@ -85,7 +97,7 @@ pub trait ChainApi<Number, Hash, Header, SignedBlock> {
)] )]
fn subscribe_finalized_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Header>); fn subscribe_finalized_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Header>);
/// Unsubscribe from new head subscription. /// Unsubscribe from finalized head subscription.
#[pubsub( #[pubsub(
subscription = "chain_finalizedHead", subscription = "chain_finalizedHead",
unsubscribe, unsubscribe,
......
...@@ -94,7 +94,33 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static ...@@ -94,7 +94,33 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static
Ok(self.client().chain_info().finalized_hash) Ok(self.client().chain_info().finalized_hash)
} }
/// New head subscription /// All new head subscription
fn subscribe_all_heads(
&self,
_metadata: crate::metadata::Metadata,
subscriber: Subscriber<Block::Header>,
) {
subscribe_headers(
self.client(),
self.subscriptions(),
subscriber,
|| self.client().chain_info().best_hash,
|| self.client().import_notification_stream()
.map(|notification| Ok::<_, ()>(notification.header))
.compat(),
)
}
/// Unsubscribe from all head subscription.
fn unsubscribe_all_heads(
&self,
_metadata: Option<crate::metadata::Metadata>,
id: SubscriptionId,
) -> RpcResult<bool> {
Ok(self.subscriptions().cancel(id))
}
/// New best head subscription
fn subscribe_new_heads( fn subscribe_new_heads(
&self, &self,
_metadata: crate::metadata::Metadata, _metadata: crate::metadata::Metadata,
...@@ -112,7 +138,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static ...@@ -112,7 +138,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static
) )
} }
/// Unsubscribe from new head subscription. /// Unsubscribe from new best head subscription.
fn unsubscribe_new_heads( fn unsubscribe_new_heads(
&self, &self,
_metadata: Option<crate::metadata::Metadata>, _metadata: Option<crate::metadata::Metadata>,
...@@ -121,7 +147,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static ...@@ -121,7 +147,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static
Ok(self.subscriptions().cancel(id)) Ok(self.subscriptions().cancel(id))
} }
/// New head subscription /// Finalized head subscription
fn subscribe_finalized_heads( fn subscribe_finalized_heads(
&self, &self,
_metadata: crate::metadata::Metadata, _metadata: crate::metadata::Metadata,
...@@ -138,7 +164,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static ...@@ -138,7 +164,7 @@ trait ChainBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static
) )
} }
/// Unsubscribe from new head subscription. /// Unsubscribe from finalized head subscription.
fn unsubscribe_finalized_heads( fn unsubscribe_finalized_heads(
&self, &self,
_metadata: Option<crate::metadata::Metadata>, _metadata: Option<crate::metadata::Metadata>,
...@@ -229,6 +255,14 @@ impl<B, E, Block, RA> ChainApi<NumberFor<Block>, Block::Hash, Block::Header, Sig ...@@ -229,6 +255,14 @@ impl<B, E, Block, RA> ChainApi<NumberFor<Block>, Block::Hash, Block::Header, Sig
self.backend.finalized_head() self.backend.finalized_head()
} }
fn subscribe_all_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Block::Header>) {
self.backend.subscribe_all_heads(metadata, subscriber)
}
fn unsubscribe_all_heads(&self, metadata: Option<Self::Metadata>, id: SubscriptionId) -> RpcResult<bool> {
self.backend.unsubscribe_all_heads(metadata, id)
}
fn subscribe_new_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Block::Header>) { fn subscribe_new_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Block::Header>) {
self.backend.subscribe_new_heads(metadata, subscriber) self.backend.subscribe_new_heads(metadata, subscriber)
} }
......
...@@ -195,6 +195,35 @@ fn should_notify_about_latest_block() { ...@@ -195,6 +195,35 @@ fn should_notify_about_latest_block() {
let remote = core.executor(); let remote = core.executor();
let (subscriber, id, transport) = Subscriber::new_test("test"); let (subscriber, id, transport) = Subscriber::new_test("test");
{
let mut client = Arc::new(substrate_test_runtime_client::new());
let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
api.subscribe_all_heads(Default::default(), subscriber);
// assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
}
// assert initial head sent.
let (notification, next) = core.block_on(transport.into_future()).unwrap();
assert!(notification.is_some());
// assert notification sent to transport
let (notification, next) = core.block_on(next.into_future()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(core.block_on(next.into_future()).unwrap().0, None);
}
#[test]
fn should_notify_about_best_block() {
let mut core = ::tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let (subscriber, id, transport) = Subscriber::new_test("test");
{ {
let mut client = Arc::new(substrate_test_runtime_client::new()); let mut client = Arc::new(substrate_test_runtime_client::new());
let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote))); let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote)));
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment