Skip to content
Snippets Groups Projects
Commit e8d88fcb authored by Tomasz Drwięga's avatar Tomasz Drwięga Committed by Gav Wood
Browse files

Global unique subscription ids. (#848)

* Global unique subscription ids.

* Fix tests.
parent 04cf0072
Branches
No related merge requests found
...@@ -36,7 +36,6 @@ use primitives::{Bytes, Blake2Hasher}; ...@@ -36,7 +36,6 @@ use primitives::{Bytes, Blake2Hasher};
use rpc::futures::{Sink, Stream, Future}; use rpc::futures::{Sink, Stream, Future};
use runtime_primitives::{generic, traits}; use runtime_primitives::{generic, traits};
use subscriptions::Subscriptions; use subscriptions::Subscriptions;
use tokio::runtime::TaskExecutor;
pub mod error; pub mod error;
...@@ -60,7 +59,7 @@ build_rpc_trait! { ...@@ -60,7 +59,7 @@ build_rpc_trait! {
/// Returns all pending extrinsics, potentially grouped by sender. /// Returns all pending extrinsics, potentially grouped by sender.
#[rpc(name = "author_pendingExtrinsics")] #[rpc(name = "author_pendingExtrinsics")]
fn pending_extrinsics(&self) -> Result<PendingExtrinsics>; fn pending_extrinsics(&self) -> Result<PendingExtrinsics>;
#[pubsub(name = "author_extrinsicUpdate")] { #[pubsub(name = "author_extrinsicUpdate")] {
/// Submit an extrinsic to watch. /// Submit an extrinsic to watch.
#[rpc(name = "author_submitAndWatchExtrinsic")] #[rpc(name = "author_submitAndWatchExtrinsic")]
...@@ -90,11 +89,15 @@ impl<B, E, P> Author<B, E, P> where ...@@ -90,11 +89,15 @@ impl<B, E, P> Author<B, E, P> where
P: PoolChainApi + Sync + Send + 'static, P: PoolChainApi + Sync + Send + 'static,
{ {
/// Create new instance of Authoring API. /// Create new instance of Authoring API.
pub fn new(client: Arc<Client<B, E, <P as PoolChainApi>::Block>>, pool: Arc<Pool<P>>, executor: TaskExecutor) -> Self { pub fn new(
client: Arc<Client<B, E, <P as PoolChainApi>::Block>>,
pool: Arc<Pool<P>>,
subscriptions: Subscriptions,
) -> Self {
Author { Author {
client, client,
pool, pool,
subscriptions: Subscriptions::new(executor), subscriptions,
} }
} }
} }
......
...@@ -28,7 +28,7 @@ use runtime_primitives::generic::BlockId; ...@@ -28,7 +28,7 @@ use runtime_primitives::generic::BlockId;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Verified pub struct Verified
{ {
sender: u64, sender: u64,
hash: u64, hash: u64,
} }
...@@ -57,13 +57,13 @@ impl ChainApi for TestApi { ...@@ -57,13 +57,13 @@ impl ChainApi for TestApi {
Ok(Verified { Ok(Verified {
sender: uxt.transfer.from[31] as u64, sender: uxt.transfer.from[31] as u64,
hash: uxt.transfer.nonce, hash: uxt.transfer.nonce,
}) })
} }
fn is_ready(&self, _at: &BlockId<Block>, _c: &mut Self::Ready, _xt: &VerifiedFor<Self>) -> Readiness { fn is_ready(&self, _at: &BlockId<Block>, _c: &mut Self::Ready, _xt: &VerifiedFor<Self>) -> Readiness {
Readiness::Ready Readiness::Ready
} }
fn ready(&self) -> Self::Ready { } fn ready(&self) -> Self::Ready { }
fn compare(old: &VerifiedFor<Self>, other: &VerifiedFor<Self>) -> ::std::cmp::Ordering { fn compare(old: &VerifiedFor<Self>, other: &VerifiedFor<Self>) -> ::std::cmp::Ordering {
...@@ -151,12 +151,12 @@ fn should_watch_extrinsic() { ...@@ -151,12 +151,12 @@ fn should_watch_extrinsic() {
p.watch_extrinsic(Default::default(), subscriber, uxt(5, 5).encode().into()); p.watch_extrinsic(Default::default(), subscriber, uxt(5, 5).encode().into());
// then // then
assert_eq!(runtime.block_on(id_rx), Ok(Ok(0.into()))); assert_eq!(runtime.block_on(id_rx), Ok(Ok(1.into())));
// check notifications // check notifications
AuthorApi::submit_rich_extrinsic(&p, uxt(5, 1)).unwrap(); AuthorApi::submit_rich_extrinsic(&p, uxt(5, 1)).unwrap();
assert_eq!( assert_eq!(
runtime.block_on(data.into_future()).unwrap().0, runtime.block_on(data.into_future()).unwrap().0,
Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":1},"subscription":0}}"#.into()) Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":1},"subscription":1}}"#.into())
); );
} }
......
...@@ -26,7 +26,6 @@ use rpc::futures::{stream, Future, Sink, Stream}; ...@@ -26,7 +26,6 @@ use rpc::futures::{stream, Future, Sink, Stream};
use runtime_primitives::generic::{BlockId, SignedBlock}; use runtime_primitives::generic::{BlockId, SignedBlock};
use runtime_primitives::traits::{Block as BlockT, Header, NumberFor}; use runtime_primitives::traits::{Block as BlockT, Header, NumberFor};
use runtime_version::RuntimeVersion; use runtime_version::RuntimeVersion;
use tokio::runtime::TaskExecutor;
use primitives::{Blake2Hasher}; use primitives::{Blake2Hasher};
use subscriptions::Subscriptions; use subscriptions::Subscriptions;
...@@ -82,10 +81,10 @@ pub struct Chain<B, E, Block: BlockT> { ...@@ -82,10 +81,10 @@ pub struct Chain<B, E, Block: BlockT> {
impl<B, E, Block: BlockT> Chain<B, E, Block> { impl<B, E, Block: BlockT> Chain<B, E, Block> {
/// Create new Chain API RPC handler. /// Create new Chain API RPC handler.
pub fn new(client: Arc<Client<B, E, Block>>, executor: TaskExecutor) -> Self { pub fn new(client: Arc<Client<B, E, Block>>, subscriptions: Subscriptions) -> Self {
Self { Self {
client, client,
subscriptions: Subscriptions::new(executor), subscriptions,
} }
} }
} }
......
...@@ -168,7 +168,7 @@ fn should_notify_about_latest_block() { ...@@ -168,7 +168,7 @@ fn should_notify_about_latest_block() {
api.subscribe_new_head(Default::default(), subscriber); api.subscribe_new_head(Default::default(), subscriber);
// assert id assigned // assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0)))); assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
let builder = api.client.new_block().unwrap(); let builder = api.client.new_block().unwrap();
api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap();
......
...@@ -50,6 +50,8 @@ mod errors; ...@@ -50,6 +50,8 @@ mod errors;
mod helpers; mod helpers;
mod subscriptions; mod subscriptions;
pub use subscriptions::Subscriptions;
pub mod author; pub mod author;
pub mod chain; pub mod chain;
pub mod metadata; pub mod metadata;
......
...@@ -32,7 +32,6 @@ use rpc::Result as RpcResult; ...@@ -32,7 +32,6 @@ use rpc::Result as RpcResult;
use rpc::futures::{stream, Future, Sink, Stream}; use rpc::futures::{stream, Future, Sink, Stream};
use runtime_primitives::generic::BlockId; use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, Header}; use runtime_primitives::traits::{Block as BlockT, Header};
use tokio::runtime::TaskExecutor;
use subscriptions::Subscriptions; use subscriptions::Subscriptions;
...@@ -96,10 +95,10 @@ pub struct State<B, E, Block: BlockT> { ...@@ -96,10 +95,10 @@ pub struct State<B, E, Block: BlockT> {
impl<B, E, Block: BlockT> State<B, E, Block> { impl<B, E, Block: BlockT> State<B, E, Block> {
/// Create new State API RPC handler. /// Create new State API RPC handler.
pub fn new(client: Arc<Client<B, E, Block>>, executor: TaskExecutor) -> Self { pub fn new(client: Arc<Client<B, E, Block>>, subscriptions: Subscriptions) -> Self {
Self { Self {
client, client,
subscriptions: Subscriptions::new(executor), subscriptions,
} }
} }
} }
......
...@@ -27,7 +27,7 @@ fn should_return_storage() { ...@@ -27,7 +27,7 @@ fn should_return_storage() {
let core = ::tokio::runtime::Runtime::new().unwrap(); let core = ::tokio::runtime::Runtime::new().unwrap();
let client = Arc::new(test_client::new()); let client = Arc::new(test_client::new());
let genesis_hash = client.genesis_hash(); let genesis_hash = client.genesis_hash();
let client = State::new(client, core.executor()); let client = State::new(client, Subscriptions::new(core.executor()));
assert_matches!( assert_matches!(
client.storage(StorageKey(vec![10]), Some(genesis_hash).into()), client.storage(StorageKey(vec![10]), Some(genesis_hash).into()),
...@@ -40,7 +40,7 @@ fn should_call_contract() { ...@@ -40,7 +40,7 @@ fn should_call_contract() {
let core = ::tokio::runtime::Runtime::new().unwrap(); let core = ::tokio::runtime::Runtime::new().unwrap();
let client = Arc::new(test_client::new()); let client = Arc::new(test_client::new());
let genesis_hash = client.genesis_hash(); let genesis_hash = client.genesis_hash();
let client = State::new(client, core.executor()); let client = State::new(client, Subscriptions::new(core.executor()));
assert_matches!( assert_matches!(
client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()), client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()),
...@@ -55,15 +55,12 @@ fn should_notify_about_storage_changes() { ...@@ -55,15 +55,12 @@ fn should_notify_about_storage_changes() {
let (subscriber, id, transport) = pubsub::Subscriber::new_test("test"); let (subscriber, id, transport) = pubsub::Subscriber::new_test("test");
{ {
let api = State { let api = State::new(Arc::new(test_client::new()), Subscriptions::new(remote));
client: Arc::new(test_client::new()),
subscriptions: Subscriptions::new(remote),
};
api.subscribe_storage(Default::default(), subscriber, None.into()); api.subscribe_storage(Default::default(), subscriber, None.into());
// assert id assigned // assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0)))); assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
let mut builder = api.client.new_block().unwrap(); let mut builder = api.client.new_block().unwrap();
builder.push_transfer(runtime::Transfer { builder.push_transfer(runtime::Transfer {
...@@ -89,17 +86,14 @@ fn should_send_initial_storage_changes_and_notifications() { ...@@ -89,17 +86,14 @@ fn should_send_initial_storage_changes_and_notifications() {
let (subscriber, id, transport) = pubsub::Subscriber::new_test("test"); let (subscriber, id, transport) = pubsub::Subscriber::new_test("test");
{ {
let api = State { let api = State::new(Arc::new(test_client::new()), Subscriptions::new(remote));
client: Arc::new(test_client::new()),
subscriptions: Subscriptions::new(remote),
};
api.subscribe_storage(Default::default(), subscriber, Some(vec![ api.subscribe_storage(Default::default(), subscriber, Some(vec![
StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()), StorageKey("a52da2b7c269da1366b3ed1cdb7299ce".from_hex().unwrap()),
]).into()); ]).into());
// assert id assigned // assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0)))); assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1))));
let mut builder = api.client.new_block().unwrap(); let mut builder = api.client.new_block().unwrap();
builder.push_transfer(runtime::Transfer { builder.push_transfer(runtime::Transfer {
...@@ -125,7 +119,7 @@ fn should_send_initial_storage_changes_and_notifications() { ...@@ -125,7 +119,7 @@ fn should_send_initial_storage_changes_and_notifications() {
fn should_query_storage() { fn should_query_storage() {
let core = ::tokio::runtime::Runtime::new().unwrap(); let core = ::tokio::runtime::Runtime::new().unwrap();
let client = Arc::new(test_client::new()); let client = Arc::new(test_client::new());
let api = State::new(client.clone(), core.executor()); let api = State::new(client.clone(), Subscriptions::new(core.executor()));
let add_block = |nonce| { let add_block = |nonce| {
let mut builder = client.new_block().unwrap(); let mut builder = client.new_block().unwrap();
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>. // along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{self, AtomicUsize}; use std::sync::{Arc, atomic::{self, AtomicUsize}};
use jsonrpc_macros::pubsub; use jsonrpc_macros::pubsub;
use jsonrpc_pubsub::SubscriptionId; use jsonrpc_pubsub::SubscriptionId;
...@@ -26,14 +26,34 @@ use tokio::runtime::TaskExecutor; ...@@ -26,14 +26,34 @@ use tokio::runtime::TaskExecutor;
type Id = u64; type Id = u64;
/// Generate unique ids for subscriptions.
#[derive(Clone, Debug)]
pub struct IdProvider {
next_id: Arc<AtomicUsize>,
}
impl Default for IdProvider {
fn default() -> Self {
IdProvider {
next_id: Arc::new(AtomicUsize::new(1)),
}
}
}
impl IdProvider {
/// Returns next id for the subscription.
pub fn next_id(&self) -> Id {
self.next_id.fetch_add(1, atomic::Ordering::AcqRel) as u64
}
}
/// Subscriptions manager. /// Subscriptions manager.
/// ///
/// Takes care of assigning unique subscription ids and /// Takes care of assigning unique subscription ids and
/// driving the sinks into completion. /// driving the sinks into completion.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct Subscriptions { pub struct Subscriptions {
next_id: AtomicUsize, next_id: IdProvider,
active_subscriptions: Mutex<HashMap<Id, oneshot::Sender<()>>>, active_subscriptions: Arc<Mutex<HashMap<Id, oneshot::Sender<()>>>>,
executor: TaskExecutor, executor: TaskExecutor,
} }
...@@ -57,7 +77,7 @@ impl Subscriptions { ...@@ -57,7 +77,7 @@ impl Subscriptions {
R: future::IntoFuture<Future=F, Item=(), Error=()>, R: future::IntoFuture<Future=F, Item=(), Error=()>,
F: future::Future<Item=(), Error=()> + Send + 'static, F: future::Future<Item=(), Error=()> + Send + 'static,
{ {
let id = self.next_id.fetch_add(1, atomic::Ordering::AcqRel) as u64; let id = self.next_id.next_id();
if let Ok(sink) = subscriber.assign_id(id.into()) { if let Ok(sink) = subscriber.assign_id(id.into()) {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let future = into_future(sink) let future = into_future(sink)
......
...@@ -232,9 +232,10 @@ impl<Components> Service<Components> ...@@ -232,9 +232,10 @@ impl<Components> Service<Components>
let (rpc_http, rpc_ws) = { let (rpc_http, rpc_ws) = {
let handler = || { let handler = || {
let client = client.clone(); let client = client.clone();
let chain = rpc::apis::chain::Chain::new(client.clone(), task_executor.clone()); let subscriptions = rpc::apis::Subscriptions::new(task_executor.clone());
let state = rpc::apis::state::State::new(client.clone(), task_executor.clone()); let chain = rpc::apis::chain::Chain::new(client.clone(), subscriptions.clone());
let author = rpc::apis::author::Author::new(client.clone(), transaction_pool.clone(), task_executor.clone()); let state = rpc::apis::state::State::new(client.clone(), subscriptions.clone());
let author = rpc::apis::author::Author::new(client.clone(), transaction_pool.clone(), subscriptions.clone());
rpc::rpc_handler::<ComponentBlock<Components>, ComponentExHash<Components>, _, _, _, _, _>( rpc::rpc_handler::<ComponentBlock<Components>, ComponentExHash<Components>, _, _, _, _, _>(
state, state,
chain, chain,
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>. // along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use rand::{OsRng, Rng}; use rand::{OsRng, Rng};
use substrate_primitives::{ed25519::Pair, hexdisplay::HexDisplay}; use substrate_primitives::ed25519::Pair;
use std::cmp; use std::cmp;
fn good_waypoint(done: u64) -> u64 { fn good_waypoint(done: u64) -> u64 {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment