// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see .
//! Substrate node client.
use crate::{
chain::{Chain, ChainWithTransactions},
rpc::{
SubstrateAuthorClient, SubstrateChainClient, SubstrateFinalityClient,
SubstrateFrameSystemClient, SubstrateStateClient, SubstrateSystemClient,
},
transaction_stall_timeout, AccountKeyPairOf, ConnectionParams, Error, HashOf, HeaderIdOf,
Result, SignParam, TransactionTracker, UnsignedTransaction,
};
use async_std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait;
use bp_runtime::{HeaderIdProvider, StorageDoubleMapKeyProvider, StorageMapKeyProvider};
use codec::{Decode, Encode};
use frame_support::weights::Weight;
use futures::{SinkExt, StreamExt};
use jsonrpsee::{
core::DeserializeOwned,
ws_client::{WsClient as RpcClient, WsClientBuilder as RpcClientBuilder},
};
use num_traits::{Saturating, Zero};
use pallet_transaction_payment::RuntimeDispatchInfo;
use relay_utils::{relay_loop::RECONNECT_DELAY, STALL_TIMEOUT};
use sp_core::{
storage::{StorageData, StorageKey},
Bytes, Hasher, Pair,
};
use sp_runtime::{
traits::Header as HeaderT,
transaction_validity::{TransactionSource, TransactionValidity},
};
use sp_trie::StorageProof;
use sp_version::RuntimeVersion;
use std::future::Future;
const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_validate_transaction";
const SUB_API_TX_PAYMENT_QUERY_INFO: &str = "TransactionPaymentApi_query_info";
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
/// The difference between best block number and number of its ancestor, that is enough
/// for us to consider that ancestor an "ancient" block with dropped state.
///
/// The relay does not assume that it is connected to the archive node, so it always tries
/// to use the best available chain state. But sometimes it still may use state of some
/// old block. If the state of that block is already dropped, relay will see errors when
/// e.g. it tries to prove something.
///
/// By default Substrate-based nodes are storing state for last 256 blocks. We'll use
/// half of this value.
pub const ANCIENT_BLOCK_THRESHOLD: u32 = 128;
/// Returns `true` if we think that the state is already discarded for given block.
pub fn is_ancient_block + PartialOrd + Saturating>(block: N, best: N) -> bool {
best.saturating_sub(block) >= N::from(ANCIENT_BLOCK_THRESHOLD)
}
/// Opaque justifications subscription type.
pub struct Subscription(pub(crate) Mutex>>);
/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec;
/// A simple runtime version. It only includes the `spec_version` and `transaction_version`.
#[derive(Copy, Clone, Debug)]
pub struct SimpleRuntimeVersion {
/// Version of the runtime specification.
pub spec_version: u32,
/// All existing dispatches are fully compatible when this number doesn't change.
pub transaction_version: u32,
}
impl SimpleRuntimeVersion {
/// Create a new instance of `SimpleRuntimeVersion` from a `RuntimeVersion`.
pub const fn from_runtime_version(runtime_version: &RuntimeVersion) -> Self {
Self {
spec_version: runtime_version.spec_version,
transaction_version: runtime_version.transaction_version,
}
}
}
/// Chain runtime version in client
#[derive(Clone, Debug)]
pub enum ChainRuntimeVersion {
/// Auto query from chain.
Auto,
/// Custom runtime version, defined by user.
Custom(SimpleRuntimeVersion),
}
/// Substrate client type.
///
/// Cloning `Client` is a cheap operation that only clones internal references. Different
/// clones of the same client are guaranteed to use the same references.
pub struct Client {
// Lock order: `submit_signed_extrinsic_lock`, `data`
/// Client connection params.
params: Arc,
/// Saved chain runtime version.
chain_runtime_version: ChainRuntimeVersion,
/// If several tasks are submitting their transactions simultaneously using
/// `submit_signed_extrinsic` method, they may get the same transaction nonce. So one of
/// transactions will be rejected from the pool. This lock is here to prevent situations like
/// that.
submit_signed_extrinsic_lock: Arc>,
/// Genesis block hash.
genesis_hash: HashOf,
/// Shared dynamic data.
data: Arc>,
}
/// Client data, shared by all `Client` clones.
struct ClientData {
/// Tokio runtime handle.
tokio: Arc,
/// Substrate RPC client.
client: Arc,
}
#[async_trait]
impl relay_utils::relay_loop::Client for Client {
type Error = Error;
async fn reconnect(&mut self) -> Result<()> {
let mut data = self.data.write().await;
let (tokio, client) = Self::build_client(&self.params).await?;
data.tokio = tokio;
data.client = client;
Ok(())
}
}
impl Clone for Client {
fn clone(&self) -> Self {
Client {
params: self.params.clone(),
chain_runtime_version: self.chain_runtime_version.clone(),
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
genesis_hash: self.genesis_hash,
data: self.data.clone(),
}
}
}
impl std::fmt::Debug for Client {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("Client").field("genesis_hash", &self.genesis_hash).finish()
}
}
impl Client {
/// Returns client that is able to call RPCs on Substrate node over websocket connection.
///
/// This function will keep connecting to given Substrate node until connection is established
/// and is functional. If attempt fail, it will wait for `RECONNECT_DELAY` and retry again.
pub async fn new(params: ConnectionParams) -> Self {
let params = Arc::new(params);
loop {
match Self::try_connect(params.clone()).await {
Ok(client) => return client,
Err(error) => log::error!(
target: "bridge",
"Failed to connect to {} node: {:?}. Going to retry in {}s",
C::NAME,
error,
RECONNECT_DELAY.as_secs(),
),
}
async_std::task::sleep(RECONNECT_DELAY).await;
}
}
/// Try to connect to Substrate node over websocket. Returns Substrate RPC client if connection
/// has been established or error otherwise.
pub async fn try_connect(params: Arc) -> Result {
let (tokio, client) = Self::build_client(¶ms).await?;
let number: C::BlockNumber = Zero::zero();
let genesis_hash_client = client.clone();
let genesis_hash = tokio
.spawn(async move {
SubstrateChainClient::::block_hash(&*genesis_hash_client, Some(number)).await
})
.await??;
let chain_runtime_version = params.chain_runtime_version.clone();
Ok(Self {
params,
chain_runtime_version,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
genesis_hash,
data: Arc::new(RwLock::new(ClientData { tokio, client })),
})
}
/// Build client to use in connection.
async fn build_client(
params: &ConnectionParams,
) -> Result<(Arc, Arc)> {
let tokio = tokio::runtime::Runtime::new()?;
let uri = format!(
"{}://{}:{}",
if params.secure { "wss" } else { "ws" },
params.host,
params.port,
);
log::info!(target: "bridge", "Connecting to {} node at {}", C::NAME, uri);
let client = tokio
.spawn(async move {
RpcClientBuilder::default()
.max_buffer_capacity_per_subscription(MAX_SUBSCRIPTION_CAPACITY)
.build(&uri)
.await
})
.await??;
Ok((Arc::new(tokio), Arc::new(client)))
}
}
impl Client {
/// Return simple runtime version, only include `spec_version` and `transaction_version`.
pub async fn simple_runtime_version(&self) -> Result {
Ok(match &self.chain_runtime_version {
ChainRuntimeVersion::Auto => {
let runtime_version = self.runtime_version().await?;
SimpleRuntimeVersion::from_runtime_version(&runtime_version)
},
ChainRuntimeVersion::Custom(version) => *version,
})
}
/// Returns true if client is connected to at least one peer and is in synced state.
pub async fn ensure_synced(&self) -> Result<()> {
self.jsonrpsee_execute(|client| async move {
let health = SubstrateSystemClient::::health(&*client).await?;
let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0);
if is_synced {
Ok(())
} else {
Err(Error::ClientNotSynced(health))
}
})
.await
}
/// Return hash of the genesis block.
pub fn genesis_hash(&self) -> &C::Hash {
&self.genesis_hash
}
/// Return hash of the best finalized block.
pub async fn best_finalized_header_hash(&self) -> Result {
self.jsonrpsee_execute(|client| async move {
Ok(SubstrateChainClient::::finalized_head(&*client).await?)
})
.await
.map_err(|e| Error::FailedToReadBestFinalizedHeaderHash {
chain: C::NAME.into(),
error: e.boxed(),
})
}
/// Return number of the best finalized block.
pub async fn best_finalized_header_number(&self) -> Result {
Ok(*self.best_finalized_header().await?.number())
}
/// Return header of the best finalized block.
pub async fn best_finalized_header(&self) -> Result {
self.header_by_hash(self.best_finalized_header_hash().await?).await
}
/// Returns the best Substrate header.
pub async fn best_header(&self) -> Result
where
C::Header: DeserializeOwned,
{
self.jsonrpsee_execute(|client| async move {
Ok(SubstrateChainClient::::header(&*client, None).await?)
})
.await
.map_err(|e| Error::FailedToReadBestHeader { chain: C::NAME.into(), error: e.boxed() })
}
/// Get a Substrate block from its hash.
pub async fn get_block(&self, block_hash: Option) -> Result {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateChainClient::::block(&*client, block_hash).await?)
})
.await
}
/// Get a Substrate header by its hash.
pub async fn header_by_hash(&self, block_hash: C::Hash) -> Result
where
C::Header: DeserializeOwned,
{
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateChainClient::::header(&*client, Some(block_hash)).await?)
})
.await
.map_err(|e| Error::FailedToReadHeaderByHash {
chain: C::NAME.into(),
hash: format!("{block_hash}"),
error: e.boxed(),
})
}
/// Get a Substrate block hash by its number.
pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateChainClient::::block_hash(&*client, Some(number)).await?)
})
.await
}
/// Get a Substrate header by its number.
pub async fn header_by_number(&self, block_number: C::BlockNumber) -> Result
where
C::Header: DeserializeOwned,
{
let block_hash = Self::block_hash_by_number(self, block_number).await?;
let header_by_hash = Self::header_by_hash(self, block_hash).await?;
Ok(header_by_hash)
}
/// Return runtime version.
pub async fn runtime_version(&self) -> Result {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateStateClient::::runtime_version(&*client).await?)
})
.await
}
/// Read value from runtime storage.
pub async fn storage_value(
&self,
storage_key: StorageKey,
block_hash: Option,
) -> Result