// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see .
//! Substrate node client.
use crate::{
chain::{Chain, ChainWithBalances, TransactionStatusOf},
rpc::Substrate,
ConnectionParams, Error, HashOf, HeaderIdOf, Result,
};
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use codec::{Decode, Encode};
use frame_system::AccountInfo;
use futures::{SinkExt, StreamExt};
use jsonrpsee_ws_client::{
traits::SubscriptionClient, v2::params::JsonRpcParams, DeserializeOwned, WsClient as RpcClient,
WsClientBuilder as RpcClientBuilder,
};
use num_traits::{Bounded, Zero};
use pallet_balances::AccountData;
use pallet_transaction_payment::InclusionFee;
use relay_utils::{relay_loop::RECONNECT_DELAY, HeaderId};
use sp_core::{storage::StorageKey, Bytes, Hasher};
use sp_runtime::{
traits::Header as HeaderT,
transaction_validity::{TransactionSource, TransactionValidity},
};
use sp_trie::StorageProof;
use sp_version::RuntimeVersion;
use std::{convert::TryFrom, future::Future};
const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_validate_transaction";
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
/// Opaque justifications subscription type.
pub struct Subscription(Mutex>>);
/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec;
/// Substrate client type.
///
/// Cloning `Client` is a cheap operation.
pub struct Client {
/// Tokio runtime handle.
tokio: Arc,
/// Client connection params.
params: ConnectionParams,
/// Substrate RPC client.
client: Arc,
/// Genesis block hash.
genesis_hash: HashOf,
/// If several tasks are submitting their transactions simultaneously using
/// `submit_signed_extrinsic` method, they may get the same transaction nonce. So one of
/// transactions will be rejected from the pool. This lock is here to prevent situations like
/// that.
submit_signed_extrinsic_lock: Arc>,
}
#[async_trait]
impl relay_utils::relay_loop::Client for Client {
type Error = Error;
async fn reconnect(&mut self) -> Result<()> {
let (tokio, client) = Self::build_client(self.params.clone()).await?;
self.tokio = tokio;
self.client = client;
Ok(())
}
}
impl Clone for Client {
fn clone(&self) -> Self {
Client {
tokio: self.tokio.clone(),
params: self.params.clone(),
client: self.client.clone(),
genesis_hash: self.genesis_hash,
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
}
}
}
impl std::fmt::Debug for Client {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("Client").field("genesis_hash", &self.genesis_hash).finish()
}
}
impl Client {
/// Returns client that is able to call RPCs on Substrate node over websocket connection.
///
/// This function will keep connecting to given Substrate node until connection is established
/// and is functional. If attempt fail, it will wait for `RECONNECT_DELAY` and retry again.
pub async fn new(params: ConnectionParams) -> Self {
loop {
match Self::try_connect(params.clone()).await {
Ok(client) => return client,
Err(error) => log::error!(
target: "bridge",
"Failed to connect to {} node: {:?}. Going to retry in {}s",
C::NAME,
error,
RECONNECT_DELAY.as_secs(),
),
}
async_std::task::sleep(RECONNECT_DELAY).await;
}
}
/// Try to connect to Substrate node over websocket. Returns Substrate RPC client if connection
/// has been established or error otherwise.
pub async fn try_connect(params: ConnectionParams) -> Result {
let (tokio, client) = Self::build_client(params.clone()).await?;
let number: C::BlockNumber = Zero::zero();
let genesis_hash_client = client.clone();
let genesis_hash = tokio
.spawn(async move {
Substrate::::chain_get_block_hash(&*genesis_hash_client, number).await
})
.await??;
Ok(Self {
tokio,
params,
client,
genesis_hash,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
})
}
/// Build client to use in connection.
async fn build_client(
params: ConnectionParams,
) -> Result<(Arc, Arc)> {
let tokio = tokio::runtime::Runtime::new()?;
let uri = format!(
"{}://{}:{}",
if params.secure { "wss" } else { "ws" },
params.host,
params.port,
);
let client = tokio
.spawn(async move {
RpcClientBuilder::default()
.max_notifs_per_subscription(MAX_SUBSCRIPTION_CAPACITY)
.build(&uri)
.await
})
.await??;
Ok((Arc::new(tokio), Arc::new(client)))
}
}
impl Client {
/// Returns true if client is connected to at least one peer and is in synced state.
pub async fn ensure_synced(&self) -> Result<()> {
self.jsonrpsee_execute(|client| async move {
let health = Substrate::::system_health(&*client).await?;
let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0);
if is_synced {
Ok(())
} else {
Err(Error::ClientNotSynced(health))
}
})
.await
}
/// Return hash of the genesis block.
pub fn genesis_hash(&self) -> &C::Hash {
&self.genesis_hash
}
/// Return hash of the best finalized block.
pub async fn best_finalized_header_hash(&self) -> Result {
self.jsonrpsee_execute(|client| async move {
Ok(Substrate::::chain_get_finalized_head(&*client).await?)
})
.await
}
/// Return number of the best finalized block.
pub async fn best_finalized_header_number(&self) -> Result {
Ok(*self.header_by_hash(self.best_finalized_header_hash().await?).await?.number())
}
/// Returns the best Substrate header.
pub async fn best_header(&self) -> Result
where
C::Header: DeserializeOwned,
{
self.jsonrpsee_execute(|client| async move {
Ok(Substrate::::chain_get_header(&*client, None).await?)
})
.await
}
/// Get a Substrate block from its hash.
pub async fn get_block(&self, block_hash: Option) -> Result {
self.jsonrpsee_execute(move |client| async move {
Ok(Substrate::::chain_get_block(&*client, block_hash).await?)
})
.await
}
/// Get a Substrate header by its hash.
pub async fn header_by_hash(&self, block_hash: C::Hash) -> Result
where
C::Header: DeserializeOwned,
{
self.jsonrpsee_execute(move |client| async move {
Ok(Substrate::::chain_get_header(&*client, block_hash).await?)
})
.await
}
/// Get a Substrate block hash by its number.
pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result {
self.jsonrpsee_execute(move |client| async move {
Ok(Substrate::::chain_get_block_hash(&*client, number).await?)
})
.await
}
/// Get a Substrate header by its number.
pub async fn header_by_number(&self, block_number: C::BlockNumber) -> Result
where
C::Header: DeserializeOwned,
{
let block_hash = Self::block_hash_by_number(self, block_number).await?;
let header_by_hash = Self::header_by_hash(self, block_hash).await?;
Ok(header_by_hash)
}
/// Return runtime version.
pub async fn runtime_version(&self) -> Result {
self.jsonrpsee_execute(move |client| async move {
Ok(Substrate::::state_runtime_version(&*client).await?)
})
.await
}
/// Read value from runtime storage.
pub async fn storage_value(
&self,
storage_key: StorageKey,
block_hash: Option,
) -> Result