diff --git a/bridges/relays/ethereum-client/Cargo.toml b/bridges/relays/ethereum-client/Cargo.toml index b0f6485ffd384aaff01fcd80b3f9351e1e321eb7..ac5930145888fbc7304785e4731ef8f0ccf01587 100644 --- a/bridges/relays/ethereum-client/Cargo.toml +++ b/bridges/relays/ethereum-client/Cargo.toml @@ -10,7 +10,9 @@ bp-eth-poa = { path = "../../primitives/ethereum-poa" } codec = { package = "parity-scale-codec", version = "2.0.0" } headers-relay = { path = "../headers-relay" } hex-literal = "0.3" -jsonrpsee = { git = "https://github.com/svyatonik/jsonrpsee.git", branch = "shared-client-in-rpc-api", default-features = false, features = ["http"] } +jsonrpsee-proc-macros = "0.2.0-alpha" +jsonrpsee-types = "0.2.0-alpha" +jsonrpsee-ws-client = "0.2.0-alpha" libsecp256k1 = { version = "0.3.4", default-features = false, features = ["hmac"] } log = "0.4.11" relay-utils = { path = "../utils" } diff --git a/bridges/relays/ethereum-client/src/client.rs b/bridges/relays/ethereum-client/src/client.rs index 30a62a400e1dda7a1aed1f78bc53505c7247f406..593e40d67a4734d35a33333699bb55b43398b7b8 100644 --- a/bridges/relays/ethereum-client/src/client.rs +++ b/bridges/relays/ethereum-client/src/client.rs @@ -21,9 +21,8 @@ use crate::types::{ }; use crate::{ConnectionParams, Error, Result}; -use jsonrpsee::raw::RawClient; -use jsonrpsee::transport::http::HttpTransportClient; -use jsonrpsee::Client as RpcClient; +use jsonrpsee_ws_client::{WsClient as RpcClient, WsConfig as RpcConfig}; +use std::sync::Arc; /// Number of headers missing from the Ethereum node for us to consider node not synced. const MAJOR_SYNC_BLOCKS: u64 = 5; @@ -32,36 +31,36 @@ const MAJOR_SYNC_BLOCKS: u64 = 5; #[derive(Clone)] pub struct Client { params: ConnectionParams, - client: RpcClient, + client: Arc<RpcClient>, } impl Client { /// Create a new Ethereum RPC Client. - pub fn new(params: ConnectionParams) -> Self { - Self { - client: Self::build_client(¶ms), + pub async fn new(params: ConnectionParams) -> Result<Self> { + Ok(Self { + client: Self::build_client(¶ms).await?, params, - } + }) } /// Build client to use in connection. - fn build_client(params: &ConnectionParams) -> RpcClient { - let uri = format!("http://{}:{}", params.host, params.port); - let transport = HttpTransportClient::new(&uri); - let raw_client = RawClient::new(transport); - raw_client.into() + async fn build_client(params: &ConnectionParams) -> Result<Arc<RpcClient>> { + let uri = format!("ws://{}:{}", params.host, params.port); + let client = RpcClient::new(RpcConfig::with_url(&uri)).await?; + Ok(Arc::new(client)) } /// Reopen client connection. - pub fn reconnect(&mut self) { - self.client = Self::build_client(&self.params); + pub async fn reconnect(&mut self) -> Result<()> { + self.client = Self::build_client(&self.params).await?; + Ok(()) } } impl Client { /// Returns true if client is connected to at least one peer and is in synced state. pub async fn ensure_synced(&self) -> Result<()> { - match Ethereum::syncing(&self.client).await? { + match Ethereum::syncing(&*self.client).await? { SyncState::NotSyncing => Ok(()), SyncState::Syncing(syncing) => { let missing_headers = syncing.highest_block.saturating_sub(syncing.current_block); @@ -76,18 +75,18 @@ impl Client { /// Estimate gas usage for the given call. pub async fn estimate_gas(&self, call_request: CallRequest) -> Result<U256> { - Ok(Ethereum::estimate_gas(&self.client, call_request).await?) + Ok(Ethereum::estimate_gas(&*self.client, call_request).await?) } /// Retrieve number of the best known block from the Ethereum node. pub async fn best_block_number(&self) -> Result<u64> { - Ok(Ethereum::block_number(&self.client).await?.as_u64()) + Ok(Ethereum::block_number(&*self.client).await?.as_u64()) } /// Retrieve number of the best known block from the Ethereum node. pub async fn header_by_number(&self, block_number: u64) -> Result<Header> { let get_full_tx_objects = false; - let header = Ethereum::get_block_by_number(&self.client, block_number, get_full_tx_objects).await?; + let header = Ethereum::get_block_by_number(&*self.client, block_number, get_full_tx_objects).await?; match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { true => Ok(header), false => Err(Error::IncompleteHeader), @@ -97,7 +96,7 @@ impl Client { /// Retrieve block header by its hash from Ethereum node. pub async fn header_by_hash(&self, hash: H256) -> Result<Header> { let get_full_tx_objects = false; - let header = Ethereum::get_block_by_hash(&self.client, hash, get_full_tx_objects).await?; + let header = Ethereum::get_block_by_hash(&*self.client, hash, get_full_tx_objects).await?; match header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some() { true => Ok(header), false => Err(Error::IncompleteHeader), @@ -107,7 +106,8 @@ impl Client { /// Retrieve block header and its transactions by its number from Ethereum node. pub async fn header_by_number_with_transactions(&self, number: u64) -> Result<HeaderWithTransactions> { let get_full_tx_objects = true; - let header = Ethereum::get_block_by_number_with_transactions(&self.client, number, get_full_tx_objects).await?; + let header = + Ethereum::get_block_by_number_with_transactions(&*self.client, number, get_full_tx_objects).await?; let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some(); if !is_complete_header { @@ -125,7 +125,7 @@ impl Client { /// Retrieve block header and its transactions by its hash from Ethereum node. pub async fn header_by_hash_with_transactions(&self, hash: H256) -> Result<HeaderWithTransactions> { let get_full_tx_objects = true; - let header = Ethereum::get_block_by_hash_with_transactions(&self.client, hash, get_full_tx_objects).await?; + let header = Ethereum::get_block_by_hash_with_transactions(&*self.client, hash, get_full_tx_objects).await?; let is_complete_header = header.number.is_some() && header.hash.is_some() && header.logs_bloom.is_some(); if !is_complete_header { @@ -142,17 +142,17 @@ impl Client { /// Retrieve transaction by its hash from Ethereum node. pub async fn transaction_by_hash(&self, hash: H256) -> Result<Option<Transaction>> { - Ok(Ethereum::transaction_by_hash(&self.client, hash).await?) + Ok(Ethereum::transaction_by_hash(&*self.client, hash).await?) } /// Retrieve transaction receipt by transaction hash. pub async fn transaction_receipt(&self, transaction_hash: H256) -> Result<Receipt> { - Ok(Ethereum::get_transaction_receipt(&self.client, transaction_hash).await?) + Ok(Ethereum::get_transaction_receipt(&*self.client, transaction_hash).await?) } /// Get the nonce of the given account. pub async fn account_nonce(&self, address: Address) -> Result<U256> { - Ok(Ethereum::get_transaction_count(&self.client, address).await?) + Ok(Ethereum::get_transaction_count(&*self.client, address).await?) } /// Submit an Ethereum transaction. @@ -160,13 +160,13 @@ impl Client { /// The transaction must already be signed before sending it through this method. pub async fn submit_transaction(&self, signed_raw_tx: SignedRawTx) -> Result<TransactionHash> { let transaction = Bytes(signed_raw_tx); - let tx_hash = Ethereum::submit_transaction(&self.client, transaction).await?; + let tx_hash = Ethereum::submit_transaction(&*self.client, transaction).await?; log::trace!(target: "bridge", "Sent transaction to Ethereum node: {:?}", tx_hash); Ok(tx_hash) } /// Call Ethereum smart contract. pub async fn eth_call(&self, call_transaction: CallRequest) -> Result<Bytes> { - Ok(Ethereum::call(&self.client, call_transaction).await?) + Ok(Ethereum::call(&*self.client, call_transaction).await?) } } diff --git a/bridges/relays/ethereum-client/src/error.rs b/bridges/relays/ethereum-client/src/error.rs index 0f47891138aba0e0abcddc1cf87e45c290970990..80d4e89d5ee966f11527ed715645a352f3e2fff9 100644 --- a/bridges/relays/ethereum-client/src/error.rs +++ b/bridges/relays/ethereum-client/src/error.rs @@ -18,7 +18,7 @@ use crate::types::U256; -use jsonrpsee::client::RequestError; +use jsonrpsee_types::error::Error as RpcError; use relay_utils::MaybeConnectionError; /// Result type used by Ethereum client. @@ -30,7 +30,7 @@ pub type Result<T> = std::result::Result<T, Error>; pub enum Error { /// An error that can occur when making an HTTP request to /// an JSON-RPC client. - Request(RequestError), + RpcError(RpcError), /// Failed to parse response. ResponseParseFailed(String), /// We have received a header with missing fields. @@ -47,9 +47,9 @@ pub enum Error { ClientNotSynced(U256), } -impl From<RequestError> for Error { - fn from(error: RequestError) -> Self { - Error::Request(error) +impl From<RpcError> for Error { + fn from(error: RpcError) -> Self { + Error::RpcError(error) } } @@ -57,7 +57,11 @@ impl MaybeConnectionError for Error { fn is_connection_error(&self) -> bool { matches!( *self, - Error::Request(RequestError::TransportError(_)) | Error::ClientNotSynced(_), + Error::RpcError(RpcError::TransportError(_)) + // right now if connection to the ws server is dropped (after it is already established), + // we're getting this error + | Error::RpcError(RpcError::Internal(_)) + | Error::ClientNotSynced(_), ) } } @@ -65,7 +69,7 @@ impl MaybeConnectionError for Error { impl ToString for Error { fn to_string(&self) -> String { match self { - Self::Request(e) => e.to_string(), + Self::RpcError(e) => e.to_string(), Self::ResponseParseFailed(e) => e.to_string(), Self::IncompleteHeader => { "Incomplete Ethereum Header Received (missing some of required fields - hash, number, logs_bloom)" diff --git a/bridges/relays/ethereum-client/src/lib.rs b/bridges/relays/ethereum-client/src/lib.rs index 8c5a00e01b4d7119f2fc70307a6461d666ed0af1..17bfda6e6443b898f1a1d62025c1da30ff88c20b 100644 --- a/bridges/relays/ethereum-client/src/lib.rs +++ b/bridges/relays/ethereum-client/src/lib.rs @@ -29,12 +29,12 @@ pub use crate::sign::{sign_and_submit_transaction, SigningParams}; pub mod types; -/// Ethereum connection params. +/// Ethereum-over-websocket connection params. #[derive(Debug, Clone)] pub struct ConnectionParams { - /// Ethereum RPC host. + /// Websocket server hostname. pub host: String, - /// Ethereum RPC port. + /// Websocket server TCP port. pub port: u16, } @@ -42,7 +42,7 @@ impl Default for ConnectionParams { fn default() -> Self { ConnectionParams { host: "localhost".into(), - port: 8545, + port: 8546, } } } diff --git a/bridges/relays/ethereum-client/src/rpc.rs b/bridges/relays/ethereum-client/src/rpc.rs index 3fa4f6ceb9cd9d311ebf7db728628d823e245097..26cc3a6d969e0de5693b6f089602fcba3b157e64 100644 --- a/bridges/relays/ethereum-client/src/rpc.rs +++ b/bridges/relays/ethereum-client/src/rpc.rs @@ -26,7 +26,7 @@ use crate::types::{ H256, U256, U64, }; -jsonrpsee::rpc_api! { +jsonrpsee_proc_macros::rpc_client_api! { pub(crate) Ethereum { #[rpc(method = "eth_syncing", positional_params)] fn syncing() -> SyncState; diff --git a/bridges/relays/ethereum/src/cli.yml b/bridges/relays/ethereum/src/cli.yml index c6a5b08e1bb045b1e118e25c907031c088fb2c89..78971787c0e2b5b42de5cf5578c177c916699eb5 100644 --- a/bridges/relays/ethereum/src/cli.yml +++ b/bridges/relays/ethereum/src/cli.yml @@ -9,17 +9,17 @@ subcommands: - eth-host: ð-host long: eth-host value_name: ETH_HOST - help: Connect to Ethereum node at given host. + help: Connect to Ethereum node websocket server at given host. takes_value: true - eth-port: ð-port long: eth-port value_name: ETH_PORT - help: Connect to Ethereum node at given port. + help: Connect to Ethereum node websocket server at given port. takes_value: true - sub-host: &sub-host long: sub-host value_name: SUB_HOST - help: Connect to Substrate node at given host. + help: Connect to Substrate node websocket server at given host. takes_value: true - sub-port: &sub-port long: sub-port diff --git a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs index 25f8c873e590360f1ff0e8cecbd4624f20dd9a5b..e703711843a9c5b368f587d9fd0faf76bcd3e11d 100644 --- a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs +++ b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs @@ -62,7 +62,7 @@ pub fn run(params: EthereumDeployContractParams) { } = params; let result = local_pool.run_until(async move { - let eth_client = EthereumClient::new(eth_params); + let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?; let sub_client = SubstrateClient::<Rialto>::new(sub_params).await.map_err(RpcError::Substrate)?; let (initial_header_id, initial_header) = prepare_initial_header(&sub_client, sub_initial_header).await?; diff --git a/bridges/relays/ethereum/src/ethereum_exchange.rs b/bridges/relays/ethereum/src/ethereum_exchange.rs index 92ba211535129447115a35cff61e49d1002b1e13..ecd22ab81a1f9407a1474ac8659634a9fc3a35b7 100644 --- a/bridges/relays/ethereum/src/ethereum_exchange.rs +++ b/bridges/relays/ethereum/src/ethereum_exchange.rs @@ -130,8 +130,7 @@ impl RelayClient for EthereumTransactionsSource { type Error = RpcError; async fn reconnect(&mut self) -> Result<(), RpcError> { - self.client.reconnect(); - Ok(()) + self.client.reconnect().await.map_err(Into::into) } } @@ -305,7 +304,7 @@ fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H25 } = params; let result = local_pool.run_until(async move { - let eth_client = EthereumClient::new(eth_params); + let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?; let sub_client = SubstrateClient::<Rialto>::new(sub_params) .await .map_err(RpcError::Substrate)?; @@ -351,7 +350,8 @@ fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_wi } = params; let do_run_loop = move || -> Result<(), String> { - let eth_client = EthereumClient::new(eth_params); + let eth_client = async_std::task::block_on(EthereumClient::new(eth_params)) + .map_err(|err| format!("Error starting Ethereum client: {:?}", err))?; let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params)) .map_err(|err| format!("Error starting Substrate client: {:?}", err))?; diff --git a/bridges/relays/ethereum/src/ethereum_exchange_submit.rs b/bridges/relays/ethereum/src/ethereum_exchange_submit.rs index d2842b78a4a2e34191a3c526e6d4b3c5e61d9c3b..8f9f942dac53ec2b4f227379a4a3534afa8ea673 100644 --- a/bridges/relays/ethereum/src/ethereum_exchange_submit.rs +++ b/bridges/relays/ethereum/src/ethereum_exchange_submit.rs @@ -54,7 +54,9 @@ pub fn run(params: EthereumExchangeSubmitParams) { } = params; let result: Result<_, String> = local_pool.run_until(async move { - let eth_client = EthereumClient::new(eth_params); + let eth_client = EthereumClient::new(eth_params) + .await + .map_err(|err| format!("error connecting to Ethereum node: {:?}", err))?; let eth_signer_address = secret_to_address(ð_sign.signer); let sub_recipient_encoded = sub_recipient; diff --git a/bridges/relays/ethereum/src/ethereum_sync_loop.rs b/bridges/relays/ethereum/src/ethereum_sync_loop.rs index c8741c2fe18a6b2c3b2a13490d308ef411843860..2c1abb358c63f3344307704254c79a40f39eb58e 100644 --- a/bridges/relays/ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/ethereum/src/ethereum_sync_loop.rs @@ -122,8 +122,7 @@ impl RelayClient for EthereumHeadersSource { type Error = RpcError; async fn reconnect(&mut self) -> Result<(), RpcError> { - self.client.reconnect(); - Ok(()) + self.client.reconnect().await.map_err(Into::into) } } @@ -259,8 +258,8 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> { instance, } = params; - let eth_client = EthereumClient::new(eth_params); - let sub_client = async_std::task::block_on(async { SubstrateClient::<Rialto>::new(sub_params).await })?; + let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))?; + let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params))?; let sign_sub_transactions = match sync_params.target_tx_mode { TargetTransactionMode::Signed | TargetTransactionMode::Backup => true, diff --git a/bridges/relays/ethereum/src/substrate_sync_loop.rs b/bridges/relays/ethereum/src/substrate_sync_loop.rs index 9e15b9223826a7c934b463796e361aeb589d7933..5f30e247c5a79a6c2214f400c32d2cabcee3144f 100644 --- a/bridges/relays/ethereum/src/substrate_sync_loop.rs +++ b/bridges/relays/ethereum/src/substrate_sync_loop.rs @@ -123,8 +123,7 @@ impl RelayClient for EthereumHeadersTarget { type Error = RpcError; async fn reconnect(&mut self) -> Result<(), RpcError> { - self.client.reconnect(); - Ok(()) + self.client.reconnect().await.map_err(Into::into) } } @@ -174,8 +173,8 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { metrics_params, } = params; - let eth_client = EthereumClient::new(eth_params); - let sub_client = async_std::task::block_on(async { SubstrateClient::<Rialto>::new(sub_params).await })?; + let eth_client = async_std::task::block_on(EthereumClient::new(eth_params))?; + let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params))?; let target = EthereumHeadersTarget::new(eth_client, eth_contract_address, eth_sign); let source = SubstrateHeadersSource::new(sub_client); diff --git a/bridges/relays/substrate-client/Cargo.toml b/bridges/relays/substrate-client/Cargo.toml index 1c60f861e2e5af7c3381e855d7cc7304c798f658..18df0dba874b50fef2fd6f13c2268cc1b260caac 100644 --- a/bridges/relays/substrate-client/Cargo.toml +++ b/bridges/relays/substrate-client/Cargo.toml @@ -9,7 +9,9 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0" async-std = "1.6.5" async-trait = "0.1.40" codec = { package = "parity-scale-codec", version = "2.0.0" } -jsonrpsee = { git = "https://github.com/svyatonik/jsonrpsee.git", branch = "shared-client-in-rpc-api", default-features = false, features = ["ws"] } +jsonrpsee-proc-macros = "0.2.0-alpha" +jsonrpsee-types = "0.2.0-alpha" +jsonrpsee-ws-client = "0.2.0-alpha" log = "0.4.11" num-traits = "0.2" rand = "0.7" diff --git a/bridges/relays/substrate-client/src/chain.rs b/bridges/relays/substrate-client/src/chain.rs index f309c3f775e76eea6f56302e57fd322f4502b1ff..352a63dc4888490192db0ae16f06eae39044c3c5 100644 --- a/bridges/relays/substrate-client/src/chain.rs +++ b/bridges/relays/substrate-client/src/chain.rs @@ -18,7 +18,7 @@ use crate::client::Client; use bp_runtime::Chain as ChainBase; use frame_support::Parameter; -use jsonrpsee::common::{DeserializeOwned, Serialize}; +use jsonrpsee_types::jsonrpc::{DeserializeOwned, Serialize}; use num_traits::{CheckedSub, Zero}; use sp_core::{storage::StorageKey, Pair}; use sp_runtime::{ diff --git a/bridges/relays/substrate-client/src/client.rs b/bridges/relays/substrate-client/src/client.rs index 767002d68654368b60920e16c2c7b1a0bedacb46..813dda6940335599e3b20f53919305b83f1d668a 100644 --- a/bridges/relays/substrate-client/src/client.rs +++ b/bridges/relays/substrate-client/src/client.rs @@ -24,10 +24,8 @@ use bp_message_lane::{LaneId, MessageNonce}; use bp_runtime::InstanceId; use codec::Decode; use frame_system::AccountInfo; -use jsonrpsee::common::DeserializeOwned; -use jsonrpsee::raw::RawClient; -use jsonrpsee::transport::ws::WsTransportClient; -use jsonrpsee::{client::Subscription, Client as RpcClient}; +use jsonrpsee_types::{jsonrpc::DeserializeOwned, traits::SubscriptionClient}; +use jsonrpsee_ws_client::{WsClient as RpcClient, WsConfig as RpcConfig, WsSubscription as Subscription}; use num_traits::Zero; use pallet_balances::AccountData; use sp_core::Bytes; @@ -36,6 +34,7 @@ use sp_version::RuntimeVersion; use std::ops::RangeInclusive; const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; +const MAX_SUBSCRIPTION_CAPACITY: usize = 4096; /// Opaque justifications subscription type. pub type JustificationsSubscription = Subscription<Bytes>; @@ -79,7 +78,7 @@ impl<C: Chain> Client<C> { let client = Self::build_client(params.clone()).await?; let number: C::BlockNumber = Zero::zero(); - let genesis_hash = Substrate::<C, _, _>::chain_get_block_hash(&client, number).await?; + let genesis_hash = Substrate::<C>::chain_get_block_hash(&client, number).await?; Ok(Self { params, @@ -97,16 +96,17 @@ impl<C: Chain> Client<C> { /// Build client to use in connection. async fn build_client(params: ConnectionParams) -> Result<RpcClient> { let uri = format!("ws://{}:{}", params.host, params.port); - let transport = WsTransportClient::new(&uri).await?; - let raw_client = RawClient::new(transport); - Ok(raw_client.into()) + let mut config = RpcConfig::with_url(&uri); + config.max_subscription_capacity = MAX_SUBSCRIPTION_CAPACITY; + let client = RpcClient::new(config).await?; + Ok(client) } } impl<C: Chain> Client<C> { /// Returns true if client is connected to at least one peer and is in synced state. pub async fn ensure_synced(&self) -> Result<()> { - let health = Substrate::<C, _, _>::system_health(&self.client).await?; + let health = Substrate::<C>::system_health(&self.client).await?; let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0); if is_synced { Ok(()) @@ -122,7 +122,7 @@ impl<C: Chain> Client<C> { /// Return hash of the best finalized block. pub async fn best_finalized_header_hash(&self) -> Result<C::Hash> { - Ok(Substrate::<C, _, _>::chain_get_finalized_head(&self.client).await?) + Ok(Substrate::<C>::chain_get_finalized_head(&self.client).await?) } /// Returns the best Substrate header. @@ -130,12 +130,12 @@ impl<C: Chain> Client<C> { where C::Header: DeserializeOwned, { - Ok(Substrate::<C, _, _>::chain_get_header(&self.client, None).await?) + Ok(Substrate::<C>::chain_get_header(&self.client, None).await?) } /// Get a Substrate block from its hash. pub async fn get_block(&self, block_hash: Option<C::Hash>) -> Result<C::SignedBlock> { - Ok(Substrate::<C, _, _>::chain_get_block(&self.client, block_hash).await?) + Ok(Substrate::<C>::chain_get_block(&self.client, block_hash).await?) } /// Get a Substrate header by its hash. @@ -143,12 +143,12 @@ impl<C: Chain> Client<C> { where C::Header: DeserializeOwned, { - Ok(Substrate::<C, _, _>::chain_get_header(&self.client, block_hash).await?) + Ok(Substrate::<C>::chain_get_header(&self.client, block_hash).await?) } /// Get a Substrate block hash by its number. pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result<C::Hash> { - Ok(Substrate::<C, _, _>::chain_get_block_hash(&self.client, number).await?) + Ok(Substrate::<C>::chain_get_block_hash(&self.client, number).await?) } /// Get a Substrate header by its number. @@ -162,7 +162,7 @@ impl<C: Chain> Client<C> { /// Return runtime version. pub async fn runtime_version(&self) -> Result<RuntimeVersion> { - Ok(Substrate::<C, _, _>::runtime_version(&self.client).await?) + Ok(Substrate::<C>::runtime_version(&self.client).await?) } /// Return native tokens balance of the account. @@ -171,7 +171,7 @@ impl<C: Chain> Client<C> { C: ChainWithBalances, { let storage_key = C::account_info_storage_key(&account); - let encoded_account_data = Substrate::<C, _, _>::get_storage(&self.client, storage_key) + let encoded_account_data = Substrate::<C>::get_storage(&self.client, storage_key) .await? .ok_or(Error::AccountDoesNotExist)?; let decoded_account_data = @@ -184,14 +184,14 @@ impl<C: Chain> Client<C> { /// /// Note: It's the caller's responsibility to make sure `account` is a valid ss58 address. pub async fn next_account_index(&self, account: C::AccountId) -> Result<C::Index> { - Ok(Substrate::<C, _, _>::system_account_next_index(&self.client, account).await?) + Ok(Substrate::<C>::system_account_next_index(&self.client, account).await?) } /// Submit an extrinsic for inclusion in a block. /// /// Note: The given transaction does not need be SCALE encoded beforehand. pub async fn submit_extrinsic(&self, transaction: Bytes) -> Result<C::Hash> { - let tx_hash = Substrate::<C, _, _>::author_submit_extrinsic(&self.client, transaction).await?; + let tx_hash = Substrate::<C>::author_submit_extrinsic(&self.client, transaction).await?; log::trace!(target: "bridge", "Sent transaction to Substrate node: {:?}", tx_hash); Ok(tx_hash) } @@ -201,7 +201,7 @@ impl<C: Chain> Client<C> { let call = SUB_API_GRANDPA_AUTHORITIES.to_string(); let data = Bytes(Vec::new()); - let encoded_response = Substrate::<C, _, _>::state_call(&self.client, call, data, Some(block)).await?; + let encoded_response = Substrate::<C>::state_call(&self.client, call, data, Some(block)).await?; let authority_list = encoded_response.0; Ok(authority_list) @@ -209,7 +209,7 @@ impl<C: Chain> Client<C> { /// Execute runtime call at given block. pub async fn state_call(&self, method: String, data: Bytes, at_block: Option<C::Hash>) -> Result<Bytes> { - Substrate::<C, _, _>::state_call(&self.client, method, data, at_block) + Substrate::<C>::state_call(&self.client, method, data, at_block) .await .map_err(Into::into) } @@ -223,7 +223,7 @@ impl<C: Chain> Client<C> { include_outbound_lane_state: bool, at_block: C::Hash, ) -> Result<StorageProof> { - let encoded_trie_nodes = SubstrateMessageLane::<C, _, _>::prove_messages( + let encoded_trie_nodes = SubstrateMessageLane::<C>::prove_messages( &self.client, instance, lane, @@ -233,7 +233,7 @@ impl<C: Chain> Client<C> { Some(at_block), ) .await - .map_err(Error::Request)?; + .map_err(Error::RpcError)?; let decoded_trie_nodes: Vec<Vec<u8>> = Decode::decode(&mut &encoded_trie_nodes[..]).map_err(Error::ResponseParseFailed)?; Ok(StorageProof::new(decoded_trie_nodes)) @@ -247,21 +247,21 @@ impl<C: Chain> Client<C> { at_block: C::Hash, ) -> Result<Vec<Vec<u8>>> { let encoded_trie_nodes = - SubstrateMessageLane::<C, _, _>::prove_messages_delivery(&self.client, instance, lane, Some(at_block)) + SubstrateMessageLane::<C>::prove_messages_delivery(&self.client, instance, lane, Some(at_block)) .await - .map_err(Error::Request)?; + .map_err(Error::RpcError)?; let decoded_trie_nodes: Vec<Vec<u8>> = Decode::decode(&mut &encoded_trie_nodes[..]).map_err(Error::ResponseParseFailed)?; Ok(decoded_trie_nodes) } /// Return new justifications stream. - pub async fn subscribe_justifications(self) -> Result<JustificationsSubscription> { + pub async fn subscribe_justifications(&self) -> Result<JustificationsSubscription> { Ok(self .client .subscribe( "grandpa_subscribeJustifications", - jsonrpsee::common::Params::None, + jsonrpsee_types::jsonrpc::Params::None, "grandpa_unsubscribeJustifications", ) .await?) diff --git a/bridges/relays/substrate-client/src/error.rs b/bridges/relays/substrate-client/src/error.rs index 67aefe9885534794bf83f0f93e07367679030fb5..c513cf1b70f0d1e70342fb04f244e4d1ec789fec 100644 --- a/bridges/relays/substrate-client/src/error.rs +++ b/bridges/relays/substrate-client/src/error.rs @@ -16,8 +16,7 @@ //! Substrate node RPC errors. -use jsonrpsee::client::RequestError; -use jsonrpsee::transport::ws::WsNewDnsError; +use jsonrpsee_types::error::Error as RpcError; use relay_utils::MaybeConnectionError; use sc_rpc_api::system::Health; @@ -28,11 +27,9 @@ pub type Result<T> = std::result::Result<T, Error>; /// a Substrate node through RPC. #[derive(Debug)] pub enum Error { - /// Web socket connection error. - WsConnectionError(WsNewDnsError), /// An error that can occur when making a request to /// an JSON-RPC server. - Request(RequestError), + RpcError(RpcError), /// The response from the server could not be SCALE decoded. ResponseParseFailed(codec::Error), /// The Substrate bridge pallet has not yet been initialized. @@ -45,15 +42,9 @@ pub enum Error { Custom(String), } -impl From<WsNewDnsError> for Error { - fn from(error: WsNewDnsError) -> Self { - Error::WsConnectionError(error) - } -} - -impl From<RequestError> for Error { - fn from(error: RequestError) -> Self { - Error::Request(error) +impl From<RpcError> for Error { + fn from(error: RpcError) -> Self { + Error::RpcError(error) } } @@ -61,7 +52,11 @@ impl MaybeConnectionError for Error { fn is_connection_error(&self) -> bool { matches!( *self, - Error::Request(RequestError::TransportError(_)) | Error::ClientNotSynced(_) + Error::RpcError(RpcError::TransportError(_)) + // right now if connection to the ws server is dropped (after it is already established), + // we're getting this error + | Error::RpcError(RpcError::Internal(_)) + | Error::ClientNotSynced(_), ) } } @@ -75,8 +70,7 @@ impl From<Error> for String { impl ToString for Error { fn to_string(&self) -> String { match self { - Self::WsConnectionError(e) => e.to_string(), - Self::Request(e) => e.to_string(), + Self::RpcError(e) => e.to_string(), Self::ResponseParseFailed(e) => e.to_string(), Self::UninitializedBridgePallet => "The Substrate bridge pallet has not been initialized yet.".into(), Self::AccountDoesNotExist => "Account does not exist on the chain".into(), diff --git a/bridges/relays/substrate-client/src/rpc.rs b/bridges/relays/substrate-client/src/rpc.rs index 2e832b4018174f0c42880c0862fa0c976c74e8e1..d768912a2f1a50b3dfe26802f8a2b0483ddb78ae 100644 --- a/bridges/relays/substrate-client/src/rpc.rs +++ b/bridges/relays/substrate-client/src/rpc.rs @@ -32,7 +32,7 @@ use sp_core::{ }; use sp_version::RuntimeVersion; -jsonrpsee::rpc_api! { +jsonrpsee_proc_macros::rpc_client_api! { pub(crate) Substrate<C: Chain> { #[rpc(method = "system_health", positional_params)] fn system_health() -> Health; diff --git a/bridges/relays/substrate/src/headers_maintain.rs b/bridges/relays/substrate/src/headers_maintain.rs index 14432487ea30814a74b3fe969afda09c015ecd5b..fb609a2ef0705429b355a98142ca789fc81d3a9b 100644 --- a/bridges/relays/substrate/src/headers_maintain.rs +++ b/bridges/relays/substrate/src/headers_maintain.rs @@ -47,8 +47,10 @@ use sp_runtime::{traits::Header as HeaderT, Justification}; use std::{collections::VecDeque, marker::PhantomData, task::Poll}; /// Substrate-to-Substrate headers synchronization maintain procedure. -pub struct SubstrateHeadersToSubstrateMaintain<P: SubstrateHeadersSyncPipeline, SourceChain, TargetChain: Chain> { +pub struct SubstrateHeadersToSubstrateMaintain<P: SubstrateHeadersSyncPipeline, SourceChain: Chain, TargetChain: Chain> +{ pipeline: P, + source_client: Client<SourceChain>, target_client: Client<TargetChain>, justifications: Arc<Mutex<Justifications<P>>>, _marker: PhantomData<SourceChain>, @@ -56,20 +58,23 @@ pub struct SubstrateHeadersToSubstrateMaintain<P: SubstrateHeadersSyncPipeline, /// Future and already received justifications from the source chain. struct Justifications<P: SubstrateHeadersSyncPipeline> { - /// Justifications stream. - stream: JustificationsSubscription, + /// Justifications stream. None if it hasn't been initialized yet, or it has been dropped + /// by the rpc library. + stream: Option<JustificationsSubscription>, /// Justifications that we have read from the stream but have not sent to the /// target node, because their targets were still not synced. queue: VecDeque<(HeaderIdOf<P>, Justification)>, } -impl<P: SubstrateHeadersSyncPipeline, SourceChain, TargetChain: Chain> +impl<P: SubstrateHeadersSyncPipeline, SourceChain: Chain, TargetChain: Chain> SubstrateHeadersToSubstrateMaintain<P, SourceChain, TargetChain> { /// Create new maintain procedure. - pub fn new(pipeline: P, target_client: Client<TargetChain>, justifications: JustificationsSubscription) -> Self { + pub async fn new(pipeline: P, source_client: Client<SourceChain>, target_client: Client<TargetChain>) -> Self { + let justifications = subscribe_justifications(&source_client).await; SubstrateHeadersToSubstrateMaintain { pipeline, + source_client, target_client, justifications: Arc::new(Mutex::new(Justifications { stream: justifications, @@ -81,12 +86,13 @@ impl<P: SubstrateHeadersSyncPipeline, SourceChain, TargetChain: Chain> } #[async_trait] -impl<P: SubstrateHeadersSyncPipeline, SourceChain, TargetChain: Chain> Clone +impl<P: SubstrateHeadersSyncPipeline, SourceChain: Chain, TargetChain: Chain> Clone for SubstrateHeadersToSubstrateMaintain<P, SourceChain, TargetChain> { fn clone(&self) -> Self { SubstrateHeadersToSubstrateMaintain { pipeline: self.pipeline.clone(), + source_client: self.source_client.clone(), target_client: self.target_client.clone(), justifications: self.justifications.clone(), _marker: Default::default(), @@ -141,18 +147,23 @@ where // Select justification to submit to the target node. We're submitting at most one justification // on every maintain call. So maintain rate directly affects finalization rate. - let justification_to_submit = poll_fn(|context| { + let (resubscribe, justification_to_submit) = poll_fn(|context| { // read justifications from the stream and push to the queue - justifications.read_from_stream::<SourceChain::Header>(context); + let resubscribe = !justifications.read_from_stream::<SourceChain::Header>(context); // remove all obsolete justifications from the queue remove_obsolete::<P>(&mut justifications.queue, best_finalized); // select justification to submit - Poll::Ready(select_justification(&mut justifications.queue, sync)) + Poll::Ready((resubscribe, select_justification(&mut justifications.queue, sync))) }) .await; + // if justifications subscription has been dropped, resubscribe + if resubscribe { + justifications.stream = subscribe_justifications(&self.source_client).await; + } + // finally - submit selected justification if let Some((target, justification)) = justification_to_submit { let submit_result = self @@ -187,20 +198,42 @@ where P: SubstrateHeadersSyncPipeline<Completion = Justification, Extra = ()>, { /// Read justifications from the subscription stream without blocking. - fn read_from_stream<'a, SourceHeader>(&mut self, context: &mut std::task::Context<'a>) + /// + /// Returns `true` if justifications stream is still readable and `false` if it has been + /// dropped by the RPC crate && we need to resubscribe. + #[must_use] + fn read_from_stream<'a, SourceHeader>(&mut self, context: &mut std::task::Context<'a>) -> bool where SourceHeader: HeaderT, SourceHeader::Number: Into<P::Number>, SourceHeader::Hash: Into<P::Hash>, { + let stream = match self.stream.as_mut() { + Some(stream) => stream, + None => return false, + }; + loop { - let maybe_next_justification = self.stream.next(); + let maybe_next_justification = stream.next(); futures::pin_mut!(maybe_next_justification); let maybe_next_justification = maybe_next_justification.poll_unpin(context); let justification = match maybe_next_justification { Poll::Ready(justification) => justification, - Poll::Pending => return, + Poll::Pending => return true, + }; + + let justification = match justification { + Some(justification) => justification, + None => { + log::warn!( + target: "bridge", + "{} justifications stream has been dropped. Will be trying to resubscribe", + P::SOURCE_NAME, + ); + + return false; + } }; // decode justification target @@ -302,6 +335,31 @@ where Ok(best_header_id) } +/// Subscribe to justifications stream at source node. +async fn subscribe_justifications<C: Chain>(client: &Client<C>) -> Option<JustificationsSubscription> { + match client.subscribe_justifications().await { + Ok(source_justifications) => { + log::debug!( + target: "bridge", + "Successfully (re)subscribed to {} justifications", + C::NAME, + ); + + Some(source_justifications) + } + Err(error) => { + log::warn!( + target: "bridge", + "Failed to subscribe to {} justifications: {:?}", + C::NAME, + error, + ); + + None + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/bridges/relays/substrate/src/headers_pipeline.rs b/bridges/relays/substrate/src/headers_pipeline.rs index 8ad6fc50b95271b2025e447366096e29e9d42202..541a613735c52c634f8f6c677870d8bb4c919907 100644 --- a/bridges/relays/substrate/src/headers_pipeline.rs +++ b/bridges/relays/substrate/src/headers_pipeline.rs @@ -139,25 +139,12 @@ pub async fn run<SourceChain, TargetChain, P>( BlockNumberOf<SourceChain>: BlockNumberBase, TargetChain: Clone + Chain, { - let source_justifications = match source_client.clone().subscribe_justifications().await { - Ok(source_justifications) => source_justifications, - Err(error) => { - log::warn!( - target: "bridge", - "Failed to subscribe to {} justifications: {:?}", - SourceChain::NAME, - error, - ); - - return; - } - }; - let sync_maintain = SubstrateHeadersToSubstrateMaintain::<_, SourceChain, _>::new( pipeline.clone(), + source_client.clone(), target_client.clone(), - source_justifications, - ); + ) + .await; log::info!( target: "bridge",