Newer
Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate node client.
use crate::chain::{Chain, ChainWithBalances};
Svyatoslav Nikolsky
committed
use crate::rpc::Substrate;
use crate::{ConnectionParams, Error, HeaderIdOf, Result};
use codec::Decode;
use frame_system::AccountInfo;
use futures::{SinkExt, StreamExt};
use jsonrpsee_ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, DeserializeOwned};
use jsonrpsee_ws_client::{WsClient as RpcClient, WsClientBuilder as RpcClientBuilder};
use pallet_transaction_payment::InclusionFee;
use relay_utils::{relay_loop::RECONNECT_DELAY, HeaderId};
Svyatoslav Nikolsky
committed
use sp_core::{storage::StorageKey, Bytes};
use sp_runtime::traits::Header as HeaderT;
use std::{convert::TryFrom, future::Future};
const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
/// Opaque justifications subscription type.
pub struct JustificationsSubscription(Mutex<futures::channel::mpsc::Receiver<Option<Bytes>>>);
/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
/// Substrate client type.
/// Cloning `Client` is a cheap operation.
pub struct Client<C: Chain> {
/// Tokio runtime handle.
tokio: Arc<tokio::runtime::Runtime>,
/// Client connection params.
params: ConnectionParams,
/// Substrate RPC client.
/// Genesis block hash.
genesis_hash: C::Hash,
/// If several tasks are submitting their transactions simultaneously using `submit_signed_extrinsic`
/// method, they may get the same transaction nonce. So one of transactions will be rejected
/// from the pool. This lock is here to prevent situations like that.
submit_signed_extrinsic_lock: Arc<Mutex<()>>,
impl<C: Chain> Clone for Client<C> {
fn clone(&self) -> Self {
Client {
tokio: self.tokio.clone(),
params: self.params.clone(),
client: self.client.clone(),
genesis_hash: self.genesis_hash,
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
impl<C: Chain> std::fmt::Debug for Client<C> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("Client")
.field("genesis_hash", &self.genesis_hash)
.finish()
}
}
impl<C: Chain> Client<C> {
/// Returns client that is able to call RPCs on Substrate node over websocket connection.
Svyatoslav Nikolsky
committed
///
/// This function will keep connecting to given Substrate node until connection is established
Svyatoslav Nikolsky
committed
/// and is functional. If attempt fail, it will wait for `RECONNECT_DELAY` and retry again.
pub async fn new(params: ConnectionParams) -> Self {
loop {
match Self::try_connect(params.clone()).await {
Ok(client) => return client,
Err(error) => log::error!(
target: "bridge",
"Failed to connect to {} node: {:?}. Going to retry in {}s",
C::NAME,
error,
RECONNECT_DELAY.as_secs(),
),
}
async_std::task::sleep(RECONNECT_DELAY).await;
}
}
/// Try to connect to Substrate node over websocket. Returns Substrate RPC client if connection
/// has been established or error otherwise.
pub async fn try_connect(params: ConnectionParams) -> Result<Self> {
let (tokio, client) = Self::build_client(params.clone()).await?;
let number: C::BlockNumber = Zero::zero();
let genesis_hash_client = client.clone();
let genesis_hash = tokio
.spawn(async move { Substrate::<C>::chain_get_block_hash(&*genesis_hash_client, number).await })
.await??;
params,
client,
genesis_hash,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
})
}
/// Reopen client connection.
pub async fn reconnect(&mut self) -> Result<()> {
let (tokio, client) = Self::build_client(self.params.clone()).await?;
self.tokio = tokio;
self.client = client;
}
/// Build client to use in connection.
async fn build_client(params: ConnectionParams) -> Result<(Arc<tokio::runtime::Runtime>, Arc<RpcClient>)> {
let tokio = tokio::runtime::Runtime::new()?;
let uri = format!(
"{}://{}:{}",
if params.secure { "wss" } else { "ws" },
params.host,
params.port,
);
let client = tokio
.spawn(async move {
RpcClientBuilder::default()
.max_notifs_per_subscription(MAX_SUBSCRIPTION_CAPACITY)
.build(&uri)
.await
})
.await??;
Ok((Arc::new(tokio), Arc::new(client)))
}
}
/// Returns true if client is connected to at least one peer and is in synced state.
pub async fn ensure_synced(&self) -> Result<()> {
self.jsonrpsee_execute(|client| async move {
let health = Substrate::<C>::system_health(&*client).await?;
let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0);
if is_synced {
Ok(())
} else {
Err(Error::ClientNotSynced(health))
}
})
.await
/// Return hash of the genesis block.
pub fn genesis_hash(&self) -> &C::Hash {
&self.genesis_hash
}
/// Return hash of the best finalized block.
pub async fn best_finalized_header_hash(&self) -> Result<C::Hash> {
self.jsonrpsee_execute(|client| async move { Ok(Substrate::<C>::chain_get_finalized_head(&*client).await?) })
.await
/// Returns the best Substrate header.
pub async fn best_header(&self) -> Result<C::Header>
where
C::Header: DeserializeOwned,
{
self.jsonrpsee_execute(|client| async move { Ok(Substrate::<C>::chain_get_header(&*client, None).await?) })
.await
}
/// Get a Substrate block from its hash.
pub async fn get_block(&self, block_hash: Option<C::Hash>) -> Result<C::SignedBlock> {
self.jsonrpsee_execute(
move |client| async move { Ok(Substrate::<C>::chain_get_block(&*client, block_hash).await?) },
)
.await
}
/// Get a Substrate header by its hash.
pub async fn header_by_hash(&self, block_hash: C::Hash) -> Result<C::Header>
where
C::Header: DeserializeOwned,
{
self.jsonrpsee_execute(move |client| async move {
Ok(Substrate::<C>::chain_get_header(&*client, block_hash).await?)
})
.await
}
/// Get a Substrate block hash by its number.
pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result<C::Hash> {
self.jsonrpsee_execute(move |client| async move {
Ok(Substrate::<C>::chain_get_block_hash(&*client, number).await?)
})
.await
}
/// Get a Substrate header by its number.
pub async fn header_by_number(&self, block_number: C::BlockNumber) -> Result<C::Header>
where
C::Header: DeserializeOwned,
{
let block_hash = Self::block_hash_by_number(self, block_number).await?;
let header_by_hash = Self::header_by_hash(self, block_hash).await?;
Ok(header_by_hash)
/// Return runtime version.
pub async fn runtime_version(&self) -> Result<RuntimeVersion> {
self.jsonrpsee_execute(move |client| async move { Ok(Substrate::<C>::state_runtime_version(&*client).await?) })
.await
Svyatoslav Nikolsky
committed
/// Read value from runtime storage.
pub async fn storage_value<T: Send + Decode + 'static>(&self, storage_key: StorageKey) -> Result<Option<T>> {
self.jsonrpsee_execute(move |client| async move {
Substrate::<C>::state_get_storage(&*client, storage_key)
.await?
.map(|encoded_value| T::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed))
.transpose()
})
.await
Svyatoslav Nikolsky
committed
}
/// Return native tokens balance of the account.
pub async fn free_native_balance(&self, account: C::AccountId) -> Result<C::Balance>
where
C: ChainWithBalances,
{
self.jsonrpsee_execute(move |client| async move {
let storage_key = C::account_info_storage_key(&account);
let encoded_account_data = Substrate::<C>::state_get_storage(&*client, storage_key)
.await?
.ok_or(Error::AccountDoesNotExist)?;
let decoded_account_data =
AccountInfo::<C::Index, AccountData<C::Balance>>::decode(&mut &encoded_account_data.0[..])
.map_err(Error::ResponseParseFailed)?;
Ok(decoded_account_data.data.free)
})
.await
/// Get the nonce of the given Substrate account.
///
/// Note: It's the caller's responsibility to make sure `account` is a valid SS58 address.
pub async fn next_account_index(&self, account: C::AccountId) -> Result<C::Index> {
self.jsonrpsee_execute(move |client| async move {
Ok(Substrate::<C>::system_account_next_index(&*client, account).await?)
})
.await
/// Submit unsigned extrinsic for inclusion in a block.
/// Note: The given transaction needs to be SCALE encoded beforehand.
pub async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result<C::Hash> {
self.jsonrpsee_execute(move |client| async move {
let tx_hash = Substrate::<C>::author_submit_extrinsic(&*client, transaction).await?;
log::trace!(target: "bridge", "Sent transaction to Substrate node: {:?}", tx_hash);
Ok(tx_hash)
})
.await
/// Submit an extrinsic signed by given account.
///
/// All calls of this method are synchronized, so there can't be more than one active
/// `submit_signed_extrinsic()` call. This guarantees that no nonces collision may happen
/// if all client instances are clones of the same initial `Client`.
///
/// Note: The given transaction needs to be SCALE encoded beforehand.
pub async fn submit_signed_extrinsic(
&self,
extrinsic_signer: C::AccountId,
prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, C::Index) -> Bytes + Send + 'static,
) -> Result<C::Hash> {
let _guard = self.submit_signed_extrinsic_lock.lock().await;
let transaction_nonce = self.next_account_index(extrinsic_signer).await?;
let best_header = self.best_header().await?;
let best_header_id = HeaderId(*best_header.number(), best_header.hash());
self.jsonrpsee_execute(move |client| async move {
let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce);
let tx_hash = Substrate::<C>::author_submit_extrinsic(&*client, extrinsic).await?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
Ok(tx_hash)
})
.await
/// Estimate fee that will be spent on given extrinsic.
pub async fn estimate_extrinsic_fee(&self, transaction: Bytes) -> Result<C::Balance> {
self.jsonrpsee_execute(move |client| async move {
let fee_details = Substrate::<C>::payment_query_fee_details(&*client, transaction, None).await?;
let inclusion_fee = fee_details
.inclusion_fee
.map(|inclusion_fee| {
InclusionFee {
base_fee: C::Balance::try_from(inclusion_fee.base_fee.into_u256())
.unwrap_or_else(|_| C::Balance::max_value()),
len_fee: C::Balance::try_from(inclusion_fee.len_fee.into_u256())
.unwrap_or_else(|_| C::Balance::max_value()),
adjusted_weight_fee: C::Balance::try_from(inclusion_fee.adjusted_weight_fee.into_u256())
.unwrap_or_else(|_| C::Balance::max_value()),
}
.inclusion_fee()
})
.unwrap_or_else(Zero::zero);
Ok(inclusion_fee)
})
.await
/// Get the GRANDPA authority set at given block.
pub async fn grandpa_authorities_set(&self, block: C::Hash) -> Result<OpaqueGrandpaAuthoritiesSet> {
self.jsonrpsee_execute(move |client| async move {
let call = SUB_API_GRANDPA_AUTHORITIES.to_string();
let data = Bytes(Vec::new());
let encoded_response = Substrate::<C>::state_call(&*client, call, data, Some(block)).await?;
let authority_list = encoded_response.0;
Ok(authority_list)
})
.await
}
/// Execute runtime call at given block.
pub async fn state_call(&self, method: String, data: Bytes, at_block: Option<C::Hash>) -> Result<Bytes> {
self.jsonrpsee_execute(move |client| async move {
Substrate::<C>::state_call(&*client, method, data, at_block)
.await
.map_err(Into::into)
})
.await
Svyatoslav Nikolsky
committed
/// Returns storage proof of given storage keys.
pub async fn prove_storage(&self, keys: Vec<StorageKey>, at_block: C::Hash) -> Result<StorageProof> {
self.jsonrpsee_execute(move |client| async move {
Substrate::<C>::state_prove_storage(&*client, keys, Some(at_block))
.await
.map(|proof| StorageProof::new(proof.proof.into_iter().map(|b| b.0).collect()))
.map_err(Into::into)
})
.await
/// Return new justifications stream.
pub async fn subscribe_justifications(&self) -> Result<JustificationsSubscription> {
.jsonrpsee_execute(move |client| async move {
Ok(client
.subscribe(
"grandpa_subscribeJustifications",
JsonRpcParams::NoParams,
"grandpa_unsubscribeJustifications",
)
.await?)
})
.await?;
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
let (mut sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.tokio.spawn(async move {
loop {
match subscription.next().await {
Ok(Some(justification)) => {
if sender.send(Some(justification)).await.is_err() {
break;
}
}
Ok(None) => {
log::trace!(
target: "bridge",
"{} justifications subscription stream has returned None. Stream needs to be restarted.",
C::NAME,
);
let _ = sender.send(None).await;
break;
}
Err(e) => {
log::trace!(
target: "bridge",
"{} justifications subscription stream has returned '{:?}'. Stream needs to be restarted.",
C::NAME,
e,
);
let _ = sender.send(None).await;
break;
}
}
}
});
Ok(JustificationsSubscription(Mutex::new(receiver)))
}
/// Execute jsonrpsee future in tokio context.
async fn jsonrpsee_execute<MF, F, T>(&self, make_jsonrpsee_future: MF) -> Result<T>
where
MF: FnOnce(Arc<RpcClient>) -> F + Send + 'static,
F: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let client = self.client.clone();
self.tokio
.spawn(async move { make_jsonrpsee_future(client).await })
.await?
}
}
impl JustificationsSubscription {
/// Return next justification from the subscription.
pub async fn next(&self) -> Result<Option<Bytes>> {
let mut receiver = self.0.lock().await;
let justification = receiver.next().await;
Ok(justification.unwrap_or(None))
}