diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 1b24d4af9f1a7a45bac88cf2e164d2711f5dd685..bf5c8ddfb40c1f52ac4983867250579b297a8561 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -188,6 +188,8 @@ check-web-wasm: - time cargo web build -p substrate-panic-handler - time cargo web build -p substrate-peerset - time cargo web build -p substrate-primitives + # TODO: we can't use cargo web until https://github.com/paritytech/jsonrpc/pull/436 is deployed + - time cargo build -p substrate-rpc-servers --target wasm32-unknown-unknown - time cargo web build -p substrate-serializer - time cargo web build -p substrate-state-db - time cargo web build -p substrate-state-machine diff --git a/core/rpc-servers/Cargo.toml b/core/rpc-servers/Cargo.toml index bca094b572d5f1d6ed6f5a143f8eaa27d1168bed..64a494f65ffe74b870720c2b0feb02de6e0ff46d 100644 --- a/core/rpc-servers/Cargo.toml +++ b/core/rpc-servers/Cargo.toml @@ -5,10 +5,12 @@ authors = ["Parity Technologies <admin@parity.io>"] edition = "2018" [dependencies] -http = { package = "jsonrpc-http-server", version = "12.0.0" } pubsub = { package = "jsonrpc-pubsub", version = "12.0.0" } -ws = { package = "jsonrpc-ws-server", version = "12.0.0" } log = "0.4" serde = "1.0" substrate-rpc = { path = "../rpc" } sr-primitives = { path = "../sr-primitives" } + +[target.'cfg(not(target_os = "unknown"))'.dependencies] +http = { package = "jsonrpc-http-server", version = "12.0.0" } +ws = { package = "jsonrpc-ws-server", version = "12.0.0" } diff --git a/core/rpc-servers/src/lib.rs b/core/rpc-servers/src/lib.rs index 37ea83353714febfcd11e865aa870c443f3dee25..a33f726747149414ebc8e90dd59e0f04791e332c 100644 --- a/core/rpc-servers/src/lib.rs +++ b/core/rpc-servers/src/lib.rs @@ -30,10 +30,10 @@ const MAX_PAYLOAD: usize = 15 * 1024 * 1024; /// Default maximum number of connections for WS RPC servers. const WS_MAX_CONNECTIONS: usize = 100; -type Metadata = apis::metadata::Metadata; -type RpcHandler = pubsub::PubSubHandler<Metadata>; -pub type HttpServer = http::Server; -pub type WsServer = ws::Server; +pub type Metadata = apis::metadata::Metadata; +pub type RpcHandler = pubsub::PubSubHandler<Metadata>; + +pub use self::inner::*; /// Construct rpc `IoHandler` pub fn rpc_handler<Block: BlockT, ExHash, S, C, A, Y>( @@ -57,62 +57,78 @@ pub fn rpc_handler<Block: BlockT, ExHash, S, C, A, Y>( io } -/// Start HTTP server listening on given address. -pub fn start_http( - addr: &std::net::SocketAddr, - cors: Option<&Vec<String>>, - io: RpcHandler, -) -> io::Result<http::Server> { - http::ServerBuilder::new(io) - .threads(4) - .health_api(("/health", "system_health")) - .allowed_hosts(hosts_filtering(cors.is_some())) - .rest_api(if cors.is_some() { - http::RestApi::Secure - } else { - http::RestApi::Unsecure - }) - .cors(map_cors::<http::AccessControlAllowOrigin>(cors)) - .max_request_body_size(MAX_PAYLOAD) - .start_http(addr) -} +#[cfg(not(target_os = "unknown"))] +mod inner { + use super::*; -/// Start WS server listening on given address. -pub fn start_ws( - addr: &std::net::SocketAddr, - max_connections: Option<usize>, - cors: Option<&Vec<String>>, - io: RpcHandler, -) -> io::Result<ws::Server> { - ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| Metadata::new(context.sender())) - .max_payload(MAX_PAYLOAD) - .max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS)) - .allowed_origins(map_cors(cors)) - .allowed_hosts(hosts_filtering(cors.is_some())) - .start(addr) - .map_err(|err| match err { - ws::Error::Io(io) => io, - ws::Error::ConnectionClosed => io::ErrorKind::BrokenPipe.into(), - e => { - error!("{}", e); - io::ErrorKind::Other.into() - } - }) -} + pub type HttpServer = http::Server; + pub type WsServer = ws::Server; -fn map_cors<T: for<'a> From<&'a str>>( - cors: Option<&Vec<String>> -) -> http::DomainsValidation<T> { - cors.map(|x| x.iter().map(AsRef::as_ref).map(Into::into).collect::<Vec<_>>()).into() -} + /// Start HTTP server listening on given address. + /// + /// **Note**: Only available if `not(target_os = "unknown")`. + pub fn start_http( + addr: &std::net::SocketAddr, + cors: Option<&Vec<String>>, + io: RpcHandler, + ) -> io::Result<http::Server> { + http::ServerBuilder::new(io) + .threads(4) + .health_api(("/health", "system_health")) + .allowed_hosts(hosts_filtering(cors.is_some())) + .rest_api(if cors.is_some() { + http::RestApi::Secure + } else { + http::RestApi::Unsecure + }) + .cors(map_cors::<http::AccessControlAllowOrigin>(cors)) + .max_request_body_size(MAX_PAYLOAD) + .start_http(addr) + } -fn hosts_filtering(enable: bool) -> http::DomainsValidation<http::Host> { - if enable { - // NOTE The listening address is whitelisted by default. - // Setting an empty vector here enables the validation - // and allows only the listening address. - http::DomainsValidation::AllowOnly(vec![]) - } else { - http::DomainsValidation::Disabled + /// Start WS server listening on given address. + /// + /// **Note**: Only available if `not(target_os = "unknown")`. + pub fn start_ws( + addr: &std::net::SocketAddr, + max_connections: Option<usize>, + cors: Option<&Vec<String>>, + io: RpcHandler, + ) -> io::Result<ws::Server> { + ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| Metadata::new(context.sender())) + .max_payload(MAX_PAYLOAD) + .max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS)) + .allowed_origins(map_cors(cors)) + .allowed_hosts(hosts_filtering(cors.is_some())) + .start(addr) + .map_err(|err| match err { + ws::Error::Io(io) => io, + ws::Error::ConnectionClosed => io::ErrorKind::BrokenPipe.into(), + e => { + error!("{}", e); + io::ErrorKind::Other.into() + } + }) } + + fn map_cors<T: for<'a> From<&'a str>>( + cors: Option<&Vec<String>> + ) -> http::DomainsValidation<T> { + cors.map(|x| x.iter().map(AsRef::as_ref).map(Into::into).collect::<Vec<_>>()).into() + } + + fn hosts_filtering(enable: bool) -> http::DomainsValidation<http::Host> { + if enable { + // NOTE The listening address is whitelisted by default. + // Setting an empty vector here enables the validation + // and allows only the listening address. + http::DomainsValidation::AllowOnly(vec![]) + } else { + http::DomainsValidation::Disabled + } + } +} + +#[cfg(target_os = "unknown")] +mod inner { } diff --git a/core/service/src/components.rs b/core/service/src/components.rs index 7075614691b76f5d093a3ccf47fc21001a0e8aa5..fe72e2c2802de5ddd9082cf3e9b338e3d2a2eee4 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -16,12 +16,12 @@ //! Substrate service components. -use std::{sync::Arc, net::SocketAddr, ops::Deref, ops::DerefMut}; +use std::{sync::Arc, ops::Deref, ops::DerefMut}; use serde::{Serialize, de::DeserializeOwned}; use crate::chain_spec::ChainSpec; use client_db; use client::{self, Client, runtime_api}; -use crate::{error, Service, maybe_start_server}; +use crate::{error, Service}; use consensus_common::{import_queue::ImportQueue, SelectChain}; use network::{self, OnDemand, FinalityProofProvider}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; @@ -32,7 +32,6 @@ use runtime_primitives::{ use crate::config::Configuration; use primitives::{Blake2Hasher, H256}; use rpc::{self, apis::system::SystemInfo}; -use parking_lot::Mutex; use futures::{prelude::*, future::Executor, sync::mpsc}; // Type aliases. @@ -144,72 +143,37 @@ impl<T: Serialize + DeserializeOwned + BuildStorage> RuntimeGenesis for T {} /// Something that can start the RPC service. pub trait StartRPC<C: Components> { - type ServersHandle: Send + Sync; - fn start_rpc( client: Arc<ComponentClient<C>>, system_send_back: mpsc::UnboundedSender<rpc::apis::system::Request<ComponentBlock<C>>>, system_info: SystemInfo, - rpc_http: Option<SocketAddr>, - rpc_ws: Option<SocketAddr>, - rpc_ws_max_connections: Option<usize>, - rpc_cors: Option<Vec<String>>, task_executor: TaskExecutor, transaction_pool: Arc<TransactionPool<C::TransactionPoolApi>>, - ) -> error::Result<Self::ServersHandle>; + ) -> rpc::RpcHandler; } impl<C: Components> StartRPC<Self> for C where ComponentClient<C>: ProvideRuntimeApi, <ComponentClient<C> as ProvideRuntimeApi>::Api: runtime_api::Metadata<ComponentBlock<C>>, { - type ServersHandle = (Option<rpc::HttpServer>, Option<Mutex<rpc::WsServer>>); - fn start_rpc( client: Arc<ComponentClient<C>>, system_send_back: mpsc::UnboundedSender<rpc::apis::system::Request<ComponentBlock<C>>>, rpc_system_info: SystemInfo, - rpc_http: Option<SocketAddr>, - rpc_ws: Option<SocketAddr>, - rpc_ws_max_connections: Option<usize>, - rpc_cors: Option<Vec<String>>, task_executor: TaskExecutor, transaction_pool: Arc<TransactionPool<C::TransactionPoolApi>>, - ) -> error::Result<Self::ServersHandle> { - let handler = || { - let client = client.clone(); - let subscriptions = rpc::apis::Subscriptions::new(task_executor.clone()); - let chain = rpc::apis::chain::Chain::new(client.clone(), subscriptions.clone()); - let state = rpc::apis::state::State::new(client.clone(), subscriptions.clone()); - let author = rpc::apis::author::Author::new( - client.clone(), transaction_pool.clone(), subscriptions - ); - let system = rpc::apis::system::System::new( - rpc_system_info.clone(), system_send_back.clone() - ); - rpc::rpc_handler::<ComponentBlock<C>, ComponentExHash<C>, _, _, _, _>( - state, - chain, - author, - system, - ) - }; - - Ok(( - maybe_start_server( - rpc_http, - |address| rpc::start_http(address, rpc_cors.as_ref(), handler()), - )?, - maybe_start_server( - rpc_ws, - |address| rpc::start_ws( - address, - rpc_ws_max_connections, - rpc_cors.as_ref(), - handler(), - ), - )?.map(Mutex::new), - )) + ) -> rpc::RpcHandler { + let subscriptions = rpc::apis::Subscriptions::new(task_executor.clone()); + let chain = rpc::apis::chain::Chain::new(client.clone(), subscriptions.clone()); + let state = rpc::apis::state::State::new(client.clone(), subscriptions.clone()); + let author = rpc::apis::author::Author::new(client, transaction_pool, subscriptions); + let system = rpc::apis::system::System::new(rpc_system_info, system_send_back); + rpc::rpc_handler::<ComponentBlock<C>, ComponentExHash<C>, _, _, _, _>( + state, + chain, + author, + system, + ) } } diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 8f4f9184cffafda96dbd25ac3ba94eed7fed6a23..933aafd706ca0585b77cc6d04d552ab43c75c467 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -91,6 +91,7 @@ pub struct Service<Components: components::Components> { to_poll: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>, /// Configuration of this Service pub config: FactoryFullConfiguration<Components::Factory>, + rpc_handlers: rpc::RpcHandler, _rpc: Box<dyn std::any::Any + Send + Sync>, _telemetry: Option<tel::Telemetry>, _telemetry_on_connect_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>>, @@ -442,37 +443,24 @@ impl<Components: components::Components> Service<Components> { let _ = to_spawn_tx.unbounded_send(Box::new(tel_task)); // RPC - let system_info = rpc::apis::system::SystemInfo { - chain_name: config.chain_spec.name().into(), - impl_name: config.impl_name.into(), - impl_version: config.impl_version.into(), - properties: config.chain_spec.properties(), - }; let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded(); - struct ExecutorWithTx(mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>); - impl futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for ExecutorWithTx { - fn execute( - &self, - future: Box<dyn Future<Item = (), Error = ()> + Send> - ) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> { - self.0.unbounded_send(future) - .map_err(|err| { - let kind = futures::future::ExecuteErrorKind::Shutdown; - futures::future::ExecuteError::new(kind, err.into_inner()) - }) - } - } - let rpc = Components::RuntimeServices::start_rpc( - client.clone(), - system_rpc_tx, - system_info, - config.rpc_http, - config.rpc_ws, - config.rpc_ws_max_connections, - config.rpc_cors.clone(), - Arc::new(ExecutorWithTx(to_spawn_tx.clone())), - transaction_pool.clone(), - )?; + let gen_handler = || { + let system_info = rpc::apis::system::SystemInfo { + chain_name: config.chain_spec.name().into(), + impl_name: config.impl_name.into(), + impl_version: config.impl_version.into(), + properties: config.chain_spec.properties(), + }; + Components::RuntimeServices::start_rpc( + client.clone(), + system_rpc_tx.clone(), + system_info.clone(), + Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone() }), + transaction_pool.clone(), + ) + }; + let rpc_handlers = gen_handler(); + let rpc = start_rpc_servers::<Components::Factory, _>(&config, gen_handler)?; let _ = to_spawn_tx.unbounded_send(Box::new(build_system_rpc_handler::<Components>( network.clone(), system_rpc_rx, @@ -534,7 +522,8 @@ impl<Components: components::Components> Service<Components> { keystore, config, exit, - _rpc: Box::new(rpc), + rpc_handlers, + _rpc: rpc, _telemetry: telemetry, _offchain_workers: offchain_workers, _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), @@ -578,6 +567,20 @@ impl<Components: components::Components> Service<Components> { } } + /// Starts an RPC query. + /// + /// The query is passed as a string and must be a JSON text similar to what an HTTP client + /// would for example send. + /// + /// Returns a `Future` that contains the optional response. + /// + /// If the request subscribes you to events, the `Sender` in the `RpcSession` object is used to + /// send back spontaneous events. + pub fn rpc_query(&self, mem: &RpcSession, request: &str) + -> impl Future<Item = Option<String>, Error = ()> { + self.rpc_handlers.handle_request(request, mem.metadata.clone()) + } + /// Get shared client instance. pub fn client(&self) -> Arc<ComponentClient<Components>> { self.client.clone() @@ -715,22 +718,74 @@ impl<Components> Drop for Service<Components> where Components: components::Comp } } -fn maybe_start_server<T, F>(address: Option<SocketAddr>, start: F) -> Result<Option<T>, io::Error> - where F: Fn(&SocketAddr) -> Result<T, io::Error>, -{ - Ok(match address { - Some(mut address) => Some(start(&address) - .or_else(|e| match e.kind() { - io::ErrorKind::AddrInUse | - io::ErrorKind::PermissionDenied => { - warn!("Unable to bind server to {}. Trying random port.", address); - address.set_port(0); - start(&address) - }, - _ => Err(e), - })?), - None => None, - }) +/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive. +#[cfg(not(target_os = "unknown"))] +fn start_rpc_servers<F: ServiceFactory, H: FnMut() -> rpc::RpcHandler>( + config: &FactoryFullConfiguration<F>, + mut gen_handler: H +) -> Result<Box<std::any::Any + Send + Sync>, error::Error> { + fn maybe_start_server<T, F>(address: Option<SocketAddr>, mut start: F) -> Result<Option<T>, io::Error> + where F: FnMut(&SocketAddr) -> Result<T, io::Error>, + { + Ok(match address { + Some(mut address) => Some(start(&address) + .or_else(|e| match e.kind() { + io::ErrorKind::AddrInUse | + io::ErrorKind::PermissionDenied => { + warn!("Unable to bind server to {}. Trying random port.", address); + address.set_port(0); + start(&address) + }, + _ => Err(e), + })?), + None => None, + }) + } + + Ok(Box::new(( + maybe_start_server( + config.rpc_http, + |address| rpc::start_http(address, config.rpc_cors.as_ref(), gen_handler()), + )?, + maybe_start_server( + config.rpc_ws, + |address| rpc::start_ws( + address, + config.rpc_ws_max_connections, + config.rpc_cors.as_ref(), + gen_handler(), + ), + )?.map(Mutex::new), + ))) +} + +/// Starts RPC servers that run in their own thread, and returns an opaque object that keeps them alive. +#[cfg(target_os = "unknown")] +fn start_rpc_servers<F: ServiceFactory, H: FnMut() -> rpc::RpcHandler>( + _: &FactoryFullConfiguration<F>, + _: H +) -> Result<Box<std::any::Any + Send + Sync>, error::Error> { + Ok(Box::new(())) +} + +/// An RPC session. Used to perform in-memory RPC queries (ie. RPC queries that don't go through +/// the HTTP or WebSockets server). +pub struct RpcSession { + metadata: rpc::Metadata, +} + +impl RpcSession { + /// Creates an RPC session. + /// + /// The `sender` is stored inside the `RpcSession` and is used to communicate spontaneous JSON + /// messages. + /// + /// The `RpcSession` must be kept alive in order to receive messages on the sender. + pub fn new(sender: mpsc::Sender<String>) -> RpcSession { + RpcSession { + metadata: rpc::Metadata::new(sender), + } + } } /// Transaction pool adapter.