Skip to content
Snippets Groups Projects
Commit 503bcb56 authored by Tomasz Drwięga's avatar Tomasz Drwięga Committed by Svyatoslav Nikolsky
Browse files

Storage changes subscription (#464)

* Initial implementation of storage events.

* Attaching storage events.

* Expose storage modification stream over RPC.

* Use FNV for hashing small keys.

* Fix and add tests.

* Swap alias and RPC name.

* Fix demo.

* Addressing review grumbles.

* Fix comment.
parent 098cfcd3
Branches
No related merge requests found
Showing
with 484 additions and 61 deletions
......@@ -975,7 +975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "jsonrpc-core"
version = "8.0.2"
source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d"
source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90"
dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -987,7 +987,7 @@ dependencies = [
[[package]]
name = "jsonrpc-http-server"
version = "8.0.1"
source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d"
source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90"
dependencies = [
"hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)",
......@@ -1000,7 +1000,7 @@ dependencies = [
[[package]]
name = "jsonrpc-macros"
version = "8.0.1"
source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d"
source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90"
dependencies = [
"jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)",
"jsonrpc-pubsub 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)",
......@@ -1010,7 +1010,7 @@ dependencies = [
[[package]]
name = "jsonrpc-pubsub"
version = "8.0.1"
source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d"
source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90"
dependencies = [
"jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -1020,7 +1020,7 @@ dependencies = [
[[package]]
name = "jsonrpc-server-utils"
version = "8.0.1"
source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d"
source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90"
dependencies = [
"bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"globset 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -1033,7 +1033,7 @@ dependencies = [
[[package]]
name = "jsonrpc-ws-server"
version = "8.0.0"
source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d"
source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90"
dependencies = [
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)",
......@@ -2647,6 +2647,7 @@ version = "0.1.0"
dependencies = [
"ed25519 0.1.0",
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -2707,7 +2708,7 @@ dependencies = [
"hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
......
......@@ -173,9 +173,10 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let mut runtime = Runtime::new()?;
let _rpc_servers = {
let handler = || {
let state = rpc::apis::state::State::new(client.clone(), runtime.executor());
let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor());
let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor());
rpc::rpc_handler::<Block, _, _, _, _>(client.clone(), chain, author, DummySystem)
rpc::rpc_handler::<Block, _, _, _, _>(state, chain, author, DummySystem)
};
let http_address = "127.0.0.1:9933".parse().unwrap();
let ws_address = "127.0.0.1:9944".parse().unwrap();
......
......@@ -5,6 +5,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
error-chain = "0.12"
fnv = "1.0"
log = "0.3"
parking_lot = "0.4"
triehash = "0.1"
......
......@@ -453,7 +453,7 @@ impl<Block: BlockT> client::backend::Backend<Block> for Backend<Block> {
}
}
impl<Block: BlockT> client::backend::LocalBackend<Block> for Backend<Block>
impl<Block: BlockT> client::backend::LocalBackend<Block> for Backend<Block>
{}
#[cfg(test)]
......
......@@ -16,12 +16,12 @@
//! Polkadot Client data backend
use state_machine::backend::Backend as StateBackend;
use error;
use primitives::AuthorityId;
use runtime_primitives::bft::Justification;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use state_machine::backend::Backend as StateBackend;
/// Block insertion operation. Keeps hold if the inserted block state and data.
pub trait BlockImportOperation<Block: BlockT> {
......
......@@ -31,6 +31,7 @@ use backend::{self, BlockImportOperation};
use blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend};
use call_executor::{CallExecutor, LocalCallExecutor};
use executor::{RuntimeVersion, RuntimeInfo};
use notifications::{StorageNotifications, StorageEventStream};
use {error, in_mem, block_builder, runtime_io, bft, genesis};
/// Type that implements `futures::Stream` of block import events.
......@@ -40,6 +41,7 @@ pub type BlockchainEventStream<Block> = mpsc::UnboundedReceiver<BlockImportNotif
pub struct Client<B, E, Block> where Block: BlockT {
backend: Arc<B>,
executor: E,
storage_notifications: Mutex<StorageNotifications<Block>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification<Block>>>>,
import_lock: Mutex<()>,
importing_block: RwLock<Option<Block::Hash>>, // holds the block hash currently being imported. TODO: replace this with block queue
......@@ -49,7 +51,12 @@ pub struct Client<B, E, Block> where Block: BlockT {
/// A source of blockchain evenets.
pub trait BlockchainEvents<Block: BlockT> {
/// Get block import event stream.
fn import_notification_stream(&self) -> mpsc::UnboundedReceiver<BlockImportNotification<Block>>;
fn import_notification_stream(&self) -> BlockchainEventStream<Block>;
/// Get storage changes event stream.
///
/// Passing `None` as `filter_keys` subscribes to all storage changes.
fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result<StorageEventStream<Block::Hash>>;
}
/// Chain head information.
......@@ -182,9 +189,10 @@ impl<B, E, Block> Client<B, E, Block> where
Ok(Client {
backend,
executor,
import_notification_sinks: Mutex::new(Vec::new()),
import_lock: Mutex::new(()),
importing_block: RwLock::new(None),
storage_notifications: Default::default(),
import_notification_sinks: Default::default(),
import_lock: Default::default(),
importing_block: Default::default(),
execution_strategy,
})
}
......@@ -332,7 +340,7 @@ impl<B, E, Block> Client<B, E, Block> where
}
let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?;
let storage_update = match transaction.state()? {
let (storage_update, storage_changes) = match transaction.state()? {
Some(transaction_state) => {
let mut overlay = Default::default();
let mut r = self.executor.call_at_state(
......@@ -359,9 +367,10 @@ impl<B, E, Block> Client<B, E, Block> where
},
);
let (_, storage_update) = r?;
Some(storage_update)
overlay.commit_prospective();
(Some(storage_update), Some(overlay.into_committed()))
},
None => None,
None => (None, None)
};
let is_new_best = header.number() == &(self.backend.blockchain().info()?.best_number + One::one());
......@@ -373,7 +382,15 @@ impl<B, E, Block> Client<B, E, Block> where
transaction.update_storage(storage_update)?;
}
self.backend.commit_operation(transaction)?;
if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast {
if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock()
.trigger(&hash, storage_changes);
}
let notification = BlockImportNotification::<Block> {
hash: hash,
origin: origin,
......@@ -516,16 +533,20 @@ impl<B, E, Block> bft::Authorities<Block> for Client<B, E, Block>
impl<B, E, Block> BlockchainEvents<Block> for Client<B, E, Block>
where
B: backend::Backend<Block>,
E: CallExecutor<Block>,
Block: BlockT,
{
/// Get block import event stream.
fn import_notification_stream(&self) -> mpsc::UnboundedReceiver<BlockImportNotification<Block>> {
fn import_notification_stream(&self) -> BlockchainEventStream<Block> {
let (sink, stream) = mpsc::unbounded();
self.import_notification_sinks.lock().push(sink);
stream
}
/// Get storage changes event stream.
fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result<StorageEventStream<Block::Hash>> {
Ok(self.storage_notifications.lock().listen(filter_keys))
}
}
impl<B, E, Block> ChainHead<Block> for Client<B, E, Block>
......
......@@ -32,6 +32,7 @@ extern crate substrate_state_machine as state_machine;
#[macro_use] extern crate slog; // needed until we can reexport `slog_info` from `substrate_telemetry`
extern crate ed25519;
extern crate fnv;
extern crate futures;
extern crate parking_lot;
extern crate triehash;
......@@ -50,13 +51,15 @@ pub mod block_builder;
pub mod light;
mod call_executor;
mod client;
mod notifications;
pub use blockchain::Info as ChainInfo;
pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor};
pub use client::{
new_in_mem,
BlockStatus, BlockOrigin, BlockchainEventStream, BlockchainEvents,
Client, ClientInfo, ChainHead,
ImportResult, JustifiedHeader,
};
pub use blockchain::Info as ChainInfo;
pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor};
pub use notifications::{StorageEventStream, StorageChangeSet};
pub use state_machine::ExecutionStrategy;
......@@ -62,7 +62,11 @@ impl<S, F> Backend<S, F> {
}
}
impl<S, F, Block> ClientBackend<Block> for Backend<S, F> where Block: BlockT, S: BlockchainStorage<Block>, F: Fetcher<Block> {
impl<S, F, Block> ClientBackend<Block> for Backend<S, F> where
Block: BlockT,
S: BlockchainStorage<Block>,
F: Fetcher<Block>,
{
type BlockImportOperation = ImportOperation<Block, F>;
type Blockchain = Blockchain<S, F>;
type State = OnDemandState<Block, F>;
......
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Storage notifications
use std::{
collections::{HashSet, HashMap},
sync::Arc,
};
use fnv::{FnvHashSet, FnvHashMap};
use futures::sync::mpsc;
use primitives::storage::{StorageKey, StorageData};
use runtime_primitives::traits::Block as BlockT;
/// Storage change set
#[derive(Debug)]
pub struct StorageChangeSet {
changes: Arc<Vec<(StorageKey, Option<StorageData>)>>,
filter: Option<HashSet<StorageKey>>,
}
impl StorageChangeSet {
/// Convert the change set into iterator over storage items.
pub fn iter<'a>(&'a self) -> impl Iterator<Item=&'a (StorageKey, Option<StorageData>)> + 'a {
self.changes
.iter()
.filter(move |&(key, _)| match self.filter {
Some(ref filter) => filter.contains(key),
None => true,
})
}
}
/// Type that implements `futures::Stream` of storage change events.
pub type StorageEventStream<H> = mpsc::UnboundedReceiver<(H, StorageChangeSet)>;
type SubscriberId = u64;
/// Manages storage listeners.
#[derive(Debug)]
pub struct StorageNotifications<Block: BlockT> {
next_id: SubscriberId,
wildcard_listeners: FnvHashSet<SubscriberId>,
listeners: HashMap<StorageKey, FnvHashSet<SubscriberId>>,
sinks: FnvHashMap<SubscriberId, (
mpsc::UnboundedSender<(Block::Hash, StorageChangeSet)>,
Option<HashSet<StorageKey>>,
)>,
}
impl<Block: BlockT> Default for StorageNotifications<Block> {
fn default() -> Self {
StorageNotifications {
next_id: Default::default(),
wildcard_listeners: Default::default(),
listeners: Default::default(),
sinks: Default::default(),
}
}
}
impl<Block: BlockT> StorageNotifications<Block> {
/// Trigger notification to all listeners.
///
/// Note the changes are going to be filtered by listener's filter key.
/// In fact no event might be sent if clients are not interested in the changes.
pub fn trigger(&mut self, hash: &Block::Hash, changeset: impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>) {
let has_wildcard = !self.wildcard_listeners.is_empty();
// early exit if no listeners
if !has_wildcard && self.listeners.is_empty() {
return;
}
let mut subscribers = self.wildcard_listeners.clone();
let mut changes = Vec::new();
// Collect subscribers and changes
for (k, v) in changeset {
let k = StorageKey(k);
let listeners = self.listeners.get(&k);
if let Some(ref listeners) = listeners {
subscribers.extend(listeners.iter());
}
if has_wildcard || listeners.is_some() {
changes.push((k, v.map(StorageData)));
}
}
let changes = Arc::new(changes);
// Trigger the events
for subscriber in subscribers {
let should_remove = {
let &(ref sink, ref filter) = self.sinks.get(&subscriber)
.expect("subscribers returned from self.listeners are always in self.sinks; qed");
sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(),
filter: filter.clone(),
})).is_err()
};
if should_remove {
self.remove_subscriber(subscriber);
}
}
}
fn remove_subscriber(&mut self, subscriber: SubscriberId) {
if let Some((_, filters)) = self.sinks.remove(&subscriber) {
match filters {
None => {
self.wildcard_listeners.remove(&subscriber);
},
Some(filters) => {
for key in filters {
let remove_key = match self.listeners.get_mut(&key) {
Some(ref mut set) => {
set.remove(&subscriber);
set.is_empty()
},
None => false,
};
if remove_key {
self.listeners.remove(&key);
}
}
},
}
}
}
/// Start listening for particular storage keys.
pub fn listen(&mut self, filter_keys: Option<&[StorageKey]>) -> StorageEventStream<Block::Hash> {
self.next_id += 1;
// add subscriber for every key
let keys = match filter_keys {
None => {
self.wildcard_listeners.insert(self.next_id);
None
},
Some(keys) => Some(keys.iter().map(|key| {
self.listeners
.entry(key.clone())
.or_insert_with(Default::default)
.insert(self.next_id);
key.clone()
}).collect())
};
// insert sink
let (tx, rx) = mpsc::unbounded();
self.sinks.insert(self.next_id, (tx, keys));
rx
}
}
#[cfg(test)]
mod tests {
use runtime_primitives::testing::{H256 as Hash, Block as RawBlock};
use super::*;
use futures::Stream;
#[cfg(test)]
impl From<Vec<(StorageKey, Option<StorageData>)>> for StorageChangeSet {
fn from(changes: Vec<(StorageKey, Option<StorageData>)>) -> Self {
StorageChangeSet {
changes: Arc::new(changes),
filter: None,
}
}
}
#[cfg(test)]
impl PartialEq for StorageChangeSet {
fn eq(&self, other: &Self) -> bool {
self.iter().eq(other.iter())
}
}
type Block = RawBlock<Hash>;
#[test]
fn triggering_change_should_notify_wildcard_listeners() {
// given
let mut notifications = StorageNotifications::<Block>::default();
let mut recv = notifications.listen(None).wait();
// when
let changeset = vec![
(vec![2], Some(vec![3])),
(vec![3], None),
];
notifications.trigger(&1.into(), changeset.into_iter());
// then
assert_eq!(recv.next().unwrap(), Ok((1.into(), vec![
(StorageKey(vec![2]), Some(StorageData(vec![3]))),
(StorageKey(vec![3]), None),
].into())));
}
#[test]
fn should_only_notify_interested_listeners() {
// given
let mut notifications = StorageNotifications::<Block>::default();
let mut recv1 = notifications.listen(Some(&[StorageKey(vec![1])])).wait();
let mut recv2 = notifications.listen(Some(&[StorageKey(vec![2])])).wait();
// when
let changeset = vec![
(vec![2], Some(vec![3])),
(vec![1], None),
];
notifications.trigger(&1.into(), changeset.into_iter());
// then
assert_eq!(recv1.next().unwrap(), Ok((1.into(), vec![
(StorageKey(vec![1]), None),
].into())));
assert_eq!(recv2.next().unwrap(), Ok((1.into(), vec![
(StorageKey(vec![2]), Some(StorageData(vec![3]))),
].into())));
}
#[test]
fn should_cleanup_subscribers_if_dropped() {
// given
let mut notifications = StorageNotifications::<Block>::default();
{
let _recv1 = notifications.listen(Some(&[StorageKey(vec![1])])).wait();
let _recv2 = notifications.listen(Some(&[StorageKey(vec![2])])).wait();
let _recv3 = notifications.listen(None).wait();
assert_eq!(notifications.listeners.len(), 2);
assert_eq!(notifications.wildcard_listeners.len(), 1);
}
// when
let changeset = vec![
(vec![2], Some(vec![3])),
(vec![1], None),
];
notifications.trigger(&1.into(), changeset.into_iter());
// then
assert_eq!(notifications.listeners.len(), 0);
assert_eq!(notifications.wildcard_listeners.len(), 0);
}
}
......@@ -22,10 +22,23 @@ use rstd::vec::Vec;
/// Contract storage key.
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord))]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord, Clone))]
pub struct StorageKey(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);
/// Contract storage entry data.
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord))]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord, Clone))]
pub struct StorageData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);
/// Storage change set
#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
pub struct StorageChangeSet<Hash> {
/// Block hash
pub block: Hash,
/// A list of changes
pub changes: Vec<(
StorageKey,
Option<StorageData>,
)>,
}
......@@ -45,7 +45,7 @@ pub fn rpc_handler<Block: BlockT, S, C, A, Y>(
system: Y,
) -> RpcHandler where
Block: 'static,
S: apis::state::StateApi<Block::Hash>,
S: apis::state::StateApi<Block::Hash, Metadata=Metadata>,
C: apis::chain::ChainApi<Block::Hash, Block::Header, Metadata=Metadata>,
A: apis::author::AuthorApi<Block::Hash, Block::Extrinsic, Metadata=Metadata>,
Y: apis::system::SystemApi,
......
......@@ -18,14 +18,13 @@
use std::sync::Arc;
use runtime_primitives::traits::Block as BlockT;
use runtime_primitives::generic::BlockId;
use client::{self, Client, BlockchainEvents};
use jsonrpc_macros::pubsub;
use jsonrpc_pubsub::SubscriptionId;
use rpc::Result as RpcResult;
use rpc::futures::{Future, Sink, Stream};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::Block as BlockT;
use tokio::runtime::TaskExecutor;
use subscriptions::Subscriptions;
......@@ -51,11 +50,11 @@ build_rpc_trait! {
#[pubsub(name = "chain_newHead")] {
/// New head subscription
#[rpc(name = "subscribe_newHead")]
#[rpc(name = "chain_subscribeNewHead", alias = ["subscribe_newHead", ])]
fn subscribe_new_head(&self, Self::Metadata, pubsub::Subscriber<Header>);
/// Unsubscribe from new head subscription.
#[rpc(name = "unsubscribe_newHead")]
#[rpc(name = "chain_unsubscribeNewHead", alias = ["unsubscribe_newHead", ])]
fn unsubscribe_new_head(&self, SubscriptionId) -> RpcResult<bool>;
}
}
......@@ -72,7 +71,7 @@ pub struct Chain<B, E, Block: BlockT> {
impl<B, E, Block: BlockT> Chain<B, E, Block> {
/// Create new Chain API RPC handler.
pub fn new(client: Arc<Client<B, E, Block>>, executor: TaskExecutor) -> Self {
Chain {
Self {
client,
subscriptions: Subscriptions::new(executor),
}
......
......@@ -16,24 +16,33 @@
//! Polkadot state API.
mod error;
#[cfg(test)]
mod tests;
use std::sync::Arc;
use client::{self, Client, CallExecutor};
use client::{self, Client, CallExecutor, BlockchainEvents};
use jsonrpc_macros::Trailing;
use jsonrpc_macros::pubsub;
use jsonrpc_pubsub::SubscriptionId;
use primitives::hexdisplay::HexDisplay;
use primitives::storage::{StorageKey, StorageData, StorageChangeSet};
use rpc::Result as RpcResult;
use rpc::futures::{Future, Sink, Stream};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::Block as BlockT;
use primitives::storage::{StorageKey, StorageData};
use primitives::hexdisplay::HexDisplay;
use tokio::runtime::TaskExecutor;
use subscriptions::Subscriptions;
mod error;
#[cfg(test)]
mod tests;
use self::error::Result;
build_rpc_trait! {
/// Polkadot state API
pub trait StateApi<Hash> {
type Metadata;
/// Returns a storage entry at a specific block's state.
#[rpc(name = "state_getStorageAt")]
fn storage_at(&self, StorageKey, Hash) -> Result<StorageData>;
......@@ -65,22 +74,52 @@ build_rpc_trait! {
/// Call a contract at the best block.
#[rpc(name = "state_call")]
fn call(&self, String, Vec<u8>) -> Result<Vec<u8>>;
#[pubsub(name = "state_storage")] {
/// New storage subscription
#[rpc(name = "state_subscribeStorage")]
fn subscribe_storage(&self, Self::Metadata, pubsub::Subscriber<StorageChangeSet<Hash>>, Trailing<Vec<StorageKey>>);
/// Unsubscribe from storage subscription
#[rpc(name = "state_unsubscribeStorage")]
fn unsubscribe_storage(&self, SubscriptionId) -> RpcResult<bool>;
}
}
}
/// State API with subscriptions support.
pub struct State<B, E, Block: BlockT> {
/// Substrate client.
client: Arc<Client<B, E, Block>>,
/// Current subscriptions.
subscriptions: Subscriptions,
}
impl<B, E, Block: BlockT> State<B, E, Block> {
/// Create new State API RPC handler.
pub fn new(client: Arc<Client<B, E, Block>>, executor: TaskExecutor) -> Self {
Self {
client,
subscriptions: Subscriptions::new(executor),
}
}
}
impl<B, E, Block> StateApi<Block::Hash> for Arc<Client<B, E, Block>> where
impl<B, E, Block> StateApi<Block::Hash> for State<B, E, Block> where
Block: BlockT + 'static,
B: client::backend::Backend<Block> + Send + Sync + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
{
type Metadata = ::metadata::Metadata;
fn storage_at(&self, key: StorageKey, block: Block::Hash) -> Result<StorageData> {
trace!(target: "rpc", "Querying storage at {:?} for key {}", block, HexDisplay::from(&key.0));
Ok(self.as_ref().storage(&BlockId::Hash(block), &key)?)
Ok(self.client.storage(&BlockId::Hash(block), &key)?)
}
fn call_at(&self, method: String, data: Vec<u8>, block: Block::Hash) -> Result<Vec<u8>> {
trace!(target: "rpc", "Calling runtime at {:?} for method {} ({})", block, method, HexDisplay::from(&data));
Ok(self.as_ref().executor().call(&BlockId::Hash(block), &method, &data)?.return_data)
Ok(self.client.executor().call(&BlockId::Hash(block), &method, &data)?.return_data)
}
fn storage_hash_at(&self, key: StorageKey, block: Block::Hash) -> Result<Block::Hash> {
......@@ -93,18 +132,52 @@ impl<B, E, Block> StateApi<Block::Hash> for Arc<Client<B, E, Block>> where
}
fn storage_hash(&self, key: StorageKey) -> Result<Block::Hash> {
self.storage_hash_at(key, self.as_ref().info()?.chain.best_hash)
self.storage_hash_at(key, self.client.info()?.chain.best_hash)
}
fn storage_size(&self, key: StorageKey) -> Result<u64> {
self.storage_size_at(key, self.as_ref().info()?.chain.best_hash)
self.storage_size_at(key, self.client.info()?.chain.best_hash)
}
fn storage(&self, key: StorageKey) -> Result<StorageData> {
self.storage_at(key, self.as_ref().info()?.chain.best_hash)
self.storage_at(key, self.client.info()?.chain.best_hash)
}
fn call(&self, method: String, data: Vec<u8>) -> Result<Vec<u8>> {
self.call_at(method, data, self.as_ref().info()?.chain.best_hash)
self.call_at(method, data, self.client.info()?.chain.best_hash)
}
fn subscribe_storage(
&self,
_meta: Self::Metadata,
subscriber: pubsub::Subscriber<StorageChangeSet<Block::Hash>>,
keys: Trailing<Vec<StorageKey>>
) {
let keys = Into::<Option<Vec<_>>>::into(keys);
let stream = match self.client.storage_changes_notification_stream(keys.as_ref().map(|x| &**x)) {
Ok(stream) => stream,
Err(err) => {
let _ = subscriber.reject(error::Error::from(err).into());
return;
},
};
self.subscriptions.add(subscriber, |sink| {
let stream = stream
.map_err(|e| warn!("Error creating storage notification stream: {:?}", e))
.map(|(block, changes)| Ok(StorageChangeSet {
block,
changes: changes.iter().cloned().collect(),
}));
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(stream)
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
})
}
fn unsubscribe_storage(&self, id: SubscriptionId) -> RpcResult<bool> {
Ok(self.subscriptions.cancel(id))
}
}
......@@ -16,26 +16,60 @@
use super::*;
use self::error::{Error, ErrorKind};
use jsonrpc_macros::pubsub;
use client::BlockOrigin;
use test_client::{self, TestClient};
#[test]
fn should_return_storage() {
let core = ::tokio::runtime::Runtime::new().unwrap();
let client = Arc::new(test_client::new());
let genesis_hash = client.genesis_hash();
let client = State::new(client, core.executor());
assert_matches!(
StateApi::storage_at(&client, StorageKey(vec![10]), genesis_hash),
client.storage_at(StorageKey(vec![10]), genesis_hash),
Err(Error(ErrorKind::Client(client::error::ErrorKind::NoValueForKey(ref k)), _)) if *k == vec![10]
)
}
#[test]
fn should_call_contract() {
let core = ::tokio::runtime::Runtime::new().unwrap();
let client = Arc::new(test_client::new());
let genesis_hash = client.genesis_hash();
let client = State::new(client, core.executor());
assert_matches!(
StateApi::call_at(&client, "balanceOf".into(), vec![1,2,3], genesis_hash),
client.call_at("balanceOf".into(), vec![1,2,3], genesis_hash),
Err(Error(ErrorKind::Client(client::error::ErrorKind::Execution(_)), _))
)
}
#[test]
fn should_notify_about_storage_changes() {
let mut core = ::tokio::runtime::Runtime::new().unwrap();
let remote = core.executor();
let (subscriber, id, transport) = pubsub::Subscriber::new_test("test");
{
let api = State {
client: Arc::new(test_client::new()),
subscriptions: Subscriptions::new(remote),
};
api.subscribe_storage(Default::default(), subscriber, None.into());
// assert id assigned
assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0))));
let builder = api.client.new_block().unwrap();
api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap();
}
// assert notification send to transport
let (notification, next) = core.block_on(transport.into_future()).unwrap();
assert!(notification.is_some());
// no more notifications on this channel
assert_eq!(core.block_on(next.into_future()).unwrap().0, None);
}
......@@ -202,9 +202,10 @@ impl<Components> Service<Components>
let handler = || {
let client = client.clone();
let chain = rpc::apis::chain::Chain::new(client.clone(), task_executor.clone());
let state = rpc::apis::state::State::new(client.clone(), task_executor.clone());
let author = rpc::apis::author::Author::new(client.clone(), extrinsic_pool.api(), task_executor.clone());
rpc::rpc_handler::<ComponentBlock<Components>, _, _, _, _>(
client,
state,
chain,
author,
rpc_config.clone(),
......
......@@ -34,7 +34,6 @@ extern crate byteorder;
extern crate parking_lot;
use std::collections::HashMap;
use std::collections::hash_map::Drain;
use std::fmt;
pub mod backend;
......@@ -113,10 +112,23 @@ impl OverlayedChanges {
}
}
/// Drain prospective changes to an iterator.
pub fn drain(&mut self) -> Drain<Vec<u8>, Option<Vec<u8>>> {
/// Drain committed changes to an iterator.
///
/// Panics:
/// Will panic if there are any uncommitted prospective changes.
pub fn drain<'a>(&'a mut self) -> impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)> + 'a {
assert!(self.prospective.is_empty());
self.committed.drain()
}
/// Consume `OverlayedChanges` and take committed set.
///
/// Panics:
/// Will panic if there are any uncommitted prospective changes.
pub fn into_committed(self) -> impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)> {
assert!(self.prospective.is_empty());
self.committed.into_iter()
}
}
/// State Machine Error bound.
......@@ -354,14 +366,7 @@ pub fn execute_using_consensus_failure_handler<
result.map(move |out| (out, delta))
};
match result {
Ok(x) => {
Ok(x)
}
Err(e) => {
Err(Box::new(e))
}
}
result.map_err(|e| Box::new(e) as _)
}
/// Prove execution using the given state backend, overlayed changes, and call executor.
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment