// Copyright 2019-2021 Parity Technologies (UK) Ltd. // This file is part of Parity Bridges Common. // Parity Bridges Common is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Bridges Common is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . //! Substrate node client. use crate::{ chain::{Chain, ChainWithTransactions}, rpc::{ SubstrateAuthorClient, SubstrateChainClient, SubstrateFinalityClient, SubstrateFrameSystemClient, SubstrateStateClient, SubstrateSystemClient, }, transaction_stall_timeout, AccountKeyPairOf, ConnectionParams, Error, HashOf, HeaderIdOf, Result, SignParam, TransactionTracker, UnsignedTransaction, }; use async_std::sync::{Arc, Mutex, RwLock}; use async_trait::async_trait; use bp_runtime::{HeaderIdProvider, StorageDoubleMapKeyProvider, StorageMapKeyProvider}; use codec::{Decode, Encode}; use frame_support::weights::Weight; use futures::{SinkExt, StreamExt}; use jsonrpsee::{ core::DeserializeOwned, ws_client::{WsClient as RpcClient, WsClientBuilder as RpcClientBuilder}, }; use num_traits::{Saturating, Zero}; use pallet_transaction_payment::RuntimeDispatchInfo; use relay_utils::{relay_loop::RECONNECT_DELAY, STALL_TIMEOUT}; use sp_core::{ storage::{StorageData, StorageKey}, Bytes, Hasher, Pair, }; use sp_runtime::{ traits::Header as HeaderT, transaction_validity::{TransactionSource, TransactionValidity}, }; use sp_trie::StorageProof; use sp_version::RuntimeVersion; use std::future::Future; const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; const SUB_API_GRANDPA_GENERATE_KEY_OWNERSHIP_PROOF: &str = "GrandpaApi_generate_key_ownership_proof"; const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_validate_transaction"; const SUB_API_TX_PAYMENT_QUERY_INFO: &str = "TransactionPaymentApi_query_info"; const MAX_SUBSCRIPTION_CAPACITY: usize = 4096; /// The difference between best block number and number of its ancestor, that is enough /// for us to consider that ancestor an "ancient" block with dropped state. /// /// The relay does not assume that it is connected to the archive node, so it always tries /// to use the best available chain state. But sometimes it still may use state of some /// old block. If the state of that block is already dropped, relay will see errors when /// e.g. it tries to prove something. /// /// By default Substrate-based nodes are storing state for last 256 blocks. We'll use /// half of this value. pub const ANCIENT_BLOCK_THRESHOLD: u32 = 128; /// Returns `true` if we think that the state is already discarded for given block. pub fn is_ancient_block + PartialOrd + Saturating>(block: N, best: N) -> bool { best.saturating_sub(block) >= N::from(ANCIENT_BLOCK_THRESHOLD) } /// Opaque justifications subscription type. pub struct Subscription(pub(crate) Mutex>>); /// Opaque GRANDPA authorities set. pub type OpaqueGrandpaAuthoritiesSet = Vec; /// A simple runtime version. It only includes the `spec_version` and `transaction_version`. #[derive(Copy, Clone, Debug)] pub struct SimpleRuntimeVersion { /// Version of the runtime specification. pub spec_version: u32, /// All existing dispatches are fully compatible when this number doesn't change. pub transaction_version: u32, } impl SimpleRuntimeVersion { /// Create a new instance of `SimpleRuntimeVersion` from a `RuntimeVersion`. pub const fn from_runtime_version(runtime_version: &RuntimeVersion) -> Self { Self { spec_version: runtime_version.spec_version, transaction_version: runtime_version.transaction_version, } } } /// Chain runtime version in client #[derive(Clone, Debug)] pub enum ChainRuntimeVersion { /// Auto query from chain. Auto, /// Custom runtime version, defined by user. Custom(SimpleRuntimeVersion), } /// Substrate client type. /// /// Cloning `Client` is a cheap operation that only clones internal references. Different /// clones of the same client are guaranteed to use the same references. pub struct Client { // Lock order: `submit_signed_extrinsic_lock`, `data` /// Client connection params. params: Arc, /// Saved chain runtime version. chain_runtime_version: ChainRuntimeVersion, /// If several tasks are submitting their transactions simultaneously using /// `submit_signed_extrinsic` method, they may get the same transaction nonce. So one of /// transactions will be rejected from the pool. This lock is here to prevent situations like /// that. submit_signed_extrinsic_lock: Arc>, /// Genesis block hash. genesis_hash: HashOf, /// Shared dynamic data. data: Arc>, } /// Client data, shared by all `Client` clones. struct ClientData { /// Tokio runtime handle. tokio: Arc, /// Substrate RPC client. client: Arc, } #[async_trait] impl relay_utils::relay_loop::Client for Client { type Error = Error; async fn reconnect(&mut self) -> Result<()> { let mut data = self.data.write().await; let (tokio, client) = Self::build_client(&self.params).await?; data.tokio = tokio; data.client = client; Ok(()) } } impl Clone for Client { fn clone(&self) -> Self { Client { params: self.params.clone(), chain_runtime_version: self.chain_runtime_version.clone(), submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(), genesis_hash: self.genesis_hash, data: self.data.clone(), } } } impl std::fmt::Debug for Client { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { fmt.debug_struct("Client").field("genesis_hash", &self.genesis_hash).finish() } } impl Client { /// Returns client that is able to call RPCs on Substrate node over websocket connection. /// /// This function will keep connecting to given Substrate node until connection is established /// and is functional. If attempt fail, it will wait for `RECONNECT_DELAY` and retry again. pub async fn new(params: ConnectionParams) -> Self { let params = Arc::new(params); loop { match Self::try_connect(params.clone()).await { Ok(client) => return client, Err(error) => log::error!( target: "bridge", "Failed to connect to {} node: {:?}. Going to retry in {}s", C::NAME, error, RECONNECT_DELAY.as_secs(), ), } async_std::task::sleep(RECONNECT_DELAY).await; } } /// Try to connect to Substrate node over websocket. Returns Substrate RPC client if connection /// has been established or error otherwise. pub async fn try_connect(params: Arc) -> Result { let (tokio, client) = Self::build_client(¶ms).await?; let number: C::BlockNumber = Zero::zero(); let genesis_hash_client = client.clone(); let genesis_hash = tokio .spawn(async move { SubstrateChainClient::::block_hash(&*genesis_hash_client, Some(number)).await }) .await??; let chain_runtime_version = params.chain_runtime_version.clone(); Ok(Self { params, chain_runtime_version, submit_signed_extrinsic_lock: Arc::new(Mutex::new(())), genesis_hash, data: Arc::new(RwLock::new(ClientData { tokio, client })), }) } /// Build client to use in connection. async fn build_client( params: &ConnectionParams, ) -> Result<(Arc, Arc)> { let tokio = tokio::runtime::Runtime::new()?; let uri = format!( "{}://{}:{}", if params.secure { "wss" } else { "ws" }, params.host, params.port, ); log::info!(target: "bridge", "Connecting to {} node at {}", C::NAME, uri); let client = tokio .spawn(async move { RpcClientBuilder::default() .max_buffer_capacity_per_subscription(MAX_SUBSCRIPTION_CAPACITY) .build(&uri) .await }) .await??; Ok((Arc::new(tokio), Arc::new(client))) } } impl Client { /// Return simple runtime version, only include `spec_version` and `transaction_version`. pub async fn simple_runtime_version(&self) -> Result { Ok(match &self.chain_runtime_version { ChainRuntimeVersion::Auto => { let runtime_version = self.runtime_version().await?; SimpleRuntimeVersion::from_runtime_version(&runtime_version) }, ChainRuntimeVersion::Custom(version) => *version, }) } /// Returns true if client is connected to at least one peer and is in synced state. pub async fn ensure_synced(&self) -> Result<()> { self.jsonrpsee_execute(|client| async move { let health = SubstrateSystemClient::::health(&*client).await?; let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0); if is_synced { Ok(()) } else { Err(Error::ClientNotSynced(health)) } }) .await } /// Return hash of the genesis block. pub fn genesis_hash(&self) -> &C::Hash { &self.genesis_hash } /// Return hash of the best finalized block. pub async fn best_finalized_header_hash(&self) -> Result { self.jsonrpsee_execute(|client| async move { Ok(SubstrateChainClient::::finalized_head(&*client).await?) }) .await .map_err(|e| Error::FailedToReadBestFinalizedHeaderHash { chain: C::NAME.into(), error: e.boxed(), }) } /// Return number of the best finalized block. pub async fn best_finalized_header_number(&self) -> Result { Ok(*self.best_finalized_header().await?.number()) } /// Return header of the best finalized block. pub async fn best_finalized_header(&self) -> Result { self.header_by_hash(self.best_finalized_header_hash().await?).await } /// Returns the best Substrate header. pub async fn best_header(&self) -> Result where C::Header: DeserializeOwned, { self.jsonrpsee_execute(|client| async move { Ok(SubstrateChainClient::::header(&*client, None).await?) }) .await .map_err(|e| Error::FailedToReadBestHeader { chain: C::NAME.into(), error: e.boxed() }) } /// Get a Substrate block from its hash. pub async fn get_block(&self, block_hash: Option) -> Result { self.jsonrpsee_execute(move |client| async move { Ok(SubstrateChainClient::::block(&*client, block_hash).await?) }) .await } /// Get a Substrate header by its hash. pub async fn header_by_hash(&self, block_hash: C::Hash) -> Result where C::Header: DeserializeOwned, { self.jsonrpsee_execute(move |client| async move { Ok(SubstrateChainClient::::header(&*client, Some(block_hash)).await?) }) .await .map_err(|e| Error::FailedToReadHeaderByHash { chain: C::NAME.into(), hash: format!("{block_hash}"), error: e.boxed(), }) } /// Get a Substrate block hash by its number. pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result { self.jsonrpsee_execute(move |client| async move { Ok(SubstrateChainClient::::block_hash(&*client, Some(number)).await?) }) .await } /// Get a Substrate header by its number. pub async fn header_by_number(&self, block_number: C::BlockNumber) -> Result where C::Header: DeserializeOwned, { let block_hash = Self::block_hash_by_number(self, block_number).await?; let header_by_hash = Self::header_by_hash(self, block_hash).await?; Ok(header_by_hash) } /// Return runtime version. pub async fn runtime_version(&self) -> Result { self.jsonrpsee_execute(move |client| async move { Ok(SubstrateStateClient::::runtime_version(&*client).await?) }) .await } /// Read value from runtime storage. pub async fn storage_value( &self, storage_key: StorageKey, block_hash: Option, ) -> Result> { self.raw_storage_value(storage_key, block_hash) .await? .map(|encoded_value| { T::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed) }) .transpose() } /// Read `MapStorage` value from runtime storage. pub async fn storage_map_value( &self, pallet_prefix: &str, key: &T::Key, block_hash: Option, ) -> Result> { let storage_key = T::final_key(pallet_prefix, key); self.raw_storage_value(storage_key, block_hash) .await? .map(|encoded_value| { T::Value::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed) }) .transpose() } /// Read `DoubleMapStorage` value from runtime storage. pub async fn storage_double_map_value( &self, pallet_prefix: &str, key1: &T::Key1, key2: &T::Key2, block_hash: Option, ) -> Result> { let storage_key = T::final_key(pallet_prefix, key1, key2); self.raw_storage_value(storage_key, block_hash) .await? .map(|encoded_value| { T::Value::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed) }) .transpose() } /// Read raw value from runtime storage. pub async fn raw_storage_value( &self, storage_key: StorageKey, block_hash: Option, ) -> Result> { let cloned_storage_key = storage_key.clone(); self.jsonrpsee_execute(move |client| async move { Ok(SubstrateStateClient::::storage(&*client, storage_key.clone(), block_hash) .await?) }) .await .map_err(|e| Error::FailedToReadRuntimeStorageValue { chain: C::NAME.into(), key: cloned_storage_key, error: e.boxed(), }) } /// Get the nonce of the given Substrate account. /// /// Note: It's the caller's responsibility to make sure `account` is a valid SS58 address. pub async fn next_account_index(&self, account: C::AccountId) -> Result { self.jsonrpsee_execute(move |client| async move { Ok(SubstrateFrameSystemClient::::account_next_index(&*client, account).await?) }) .await } /// Submit unsigned extrinsic for inclusion in a block. /// /// Note: The given transaction needs to be SCALE encoded beforehand. pub async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result { self.jsonrpsee_execute(move |client| async move { let tx_hash = SubstrateAuthorClient::::submit_extrinsic(&*client, transaction) .await .map_err(|e| { log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e); e })?; log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); Ok(tx_hash) }) .await } async fn build_sign_params(&self, signer: AccountKeyPairOf) -> Result> where C: ChainWithTransactions, { let runtime_version = self.simple_runtime_version().await?; Ok(SignParam:: { spec_version: runtime_version.spec_version, transaction_version: runtime_version.transaction_version, genesis_hash: self.genesis_hash, signer, }) } /// Submit an extrinsic signed by given account. /// /// All calls of this method are synchronized, so there can't be more than one active /// `submit_signed_extrinsic()` call. This guarantees that no nonces collision may happen /// if all client instances are clones of the same initial `Client`. /// /// Note: The given transaction needs to be SCALE encoded beforehand. pub async fn submit_signed_extrinsic( &self, signer: &AccountKeyPairOf, prepare_extrinsic: impl FnOnce(HeaderIdOf, C::Nonce) -> Result> + Send + 'static, ) -> Result where C: ChainWithTransactions, C::AccountId: From<::Public>, { let _guard = self.submit_signed_extrinsic_lock.lock().await; let transaction_nonce = self.next_account_index(signer.public().into()).await?; let best_header = self.best_header().await?; let signing_data = self.build_sign_params(signer.clone()).await?; // By using parent of best block here, we are protecing again best-block reorganizations. // E.g. transaction may have been submitted when the best block was `A[num=100]`. Then it // has been changed to `B[num=100]`. Hash of `A` has been included into transaction // signature payload. So when signature will be checked, the check will fail and transaction // will be dropped from the pool. let best_header_id = best_header.parent_id().unwrap_or_else(|| best_header.id()); self.jsonrpsee_execute(move |client| async move { let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?; let signed_extrinsic = C::sign_transaction(signing_data, extrinsic)?.encode(); let tx_hash = SubstrateAuthorClient::::submit_extrinsic(&*client, Bytes(signed_extrinsic)) .await .map_err(|e| { log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e); e })?; log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); Ok(tx_hash) }) .await } /// Does exactly the same as `submit_signed_extrinsic`, but keeps watching for extrinsic status /// after submission. pub async fn submit_and_watch_signed_extrinsic( &self, signer: &AccountKeyPairOf, prepare_extrinsic: impl FnOnce(HeaderIdOf, C::Nonce) -> Result> + Send + 'static, ) -> Result> where C: ChainWithTransactions, C::AccountId: From<::Public>, { let self_clone = self.clone(); let signing_data = self.build_sign_params(signer.clone()).await?; let _guard = self.submit_signed_extrinsic_lock.lock().await; let transaction_nonce = self.next_account_index(signer.public().into()).await?; let best_header = self.best_header().await?; let best_header_id = best_header.id(); let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); let (tracker, subscription) = self .jsonrpsee_execute(move |client| async move { let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?; let stall_timeout = transaction_stall_timeout( extrinsic.era.mortality_period(), C::AVERAGE_BLOCK_INTERVAL, STALL_TIMEOUT, ); let signed_extrinsic = C::sign_transaction(signing_data, extrinsic)?.encode(); let tx_hash = C::Hasher::hash(&signed_extrinsic); let subscription = SubstrateAuthorClient::::submit_and_watch_extrinsic( &*client, Bytes(signed_extrinsic), ) .await .map_err(|e| { log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e); e })?; log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); let tracker = TransactionTracker::new( self_clone, stall_timeout, tx_hash, Subscription(Mutex::new(receiver)), ); Ok((tracker, subscription)) }) .await?; self.data.read().await.tokio.spawn(Subscription::background_worker( C::NAME.into(), "extrinsic".into(), subscription, sender, )); Ok(tracker) } /// Returns pending extrinsics from transaction pool. pub async fn pending_extrinsics(&self) -> Result> { self.jsonrpsee_execute(move |client| async move { Ok(SubstrateAuthorClient::::pending_extrinsics(&*client).await?) }) .await } /// Validate transaction at given block state. pub async fn validate_transaction( &self, at_block: C::Hash, transaction: SignedTransaction, ) -> Result { self.jsonrpsee_execute(move |client| async move { let call = SUB_API_TXPOOL_VALIDATE_TRANSACTION.to_string(); let data = Bytes((TransactionSource::External, transaction, at_block).encode()); let encoded_response = SubstrateStateClient::::call(&*client, call, data, Some(at_block)).await?; let validity = TransactionValidity::decode(&mut &encoded_response.0[..]) .map_err(Error::ResponseParseFailed)?; Ok(validity) }) .await } /// Returns weight of the given transaction. pub async fn extimate_extrinsic_weight( &self, transaction: SignedTransaction, ) -> Result { self.jsonrpsee_execute(move |client| async move { let transaction_len = transaction.encoded_size() as u32; let call = SUB_API_TX_PAYMENT_QUERY_INFO.to_string(); let data = Bytes((transaction, transaction_len).encode()); let encoded_response = SubstrateStateClient::::call(&*client, call, data, None).await?; let dispatch_info = RuntimeDispatchInfo::::decode(&mut &encoded_response.0[..]) .map_err(Error::ResponseParseFailed)?; Ok(dispatch_info.weight) }) .await } /// Get the GRANDPA authority set at given block. pub async fn grandpa_authorities_set( &self, block: C::Hash, ) -> Result { self.jsonrpsee_execute(move |client| async move { let call = SUB_API_GRANDPA_AUTHORITIES.to_string(); let data = Bytes(Vec::new()); let encoded_response = SubstrateStateClient::::call(&*client, call, data, Some(block)).await?; let authority_list = encoded_response.0; Ok(authority_list) }) .await } /// Execute runtime call at given block, provided the input and output types. /// It also performs the input encode and output decode. pub async fn typed_state_call( &self, method_name: String, input: Input, at_block: Option, ) -> Result { let encoded_output = self .state_call(method_name.clone(), Bytes(input.encode()), at_block) .await .map_err(|e| Error::ErrorExecutingRuntimeCall { chain: C::NAME.into(), method: method_name, error: e.boxed(), })?; Output::decode(&mut &encoded_output.0[..]).map_err(Error::ResponseParseFailed) } /// Execute runtime call at given block. pub async fn state_call( &self, method: String, data: Bytes, at_block: Option, ) -> Result { self.jsonrpsee_execute(move |client| async move { SubstrateStateClient::::call(&*client, method, data, at_block) .await .map_err(Into::into) }) .await } /// Returns storage proof of given storage keys. pub async fn prove_storage( &self, keys: Vec, at_block: C::Hash, ) -> Result { self.jsonrpsee_execute(move |client| async move { SubstrateStateClient::::prove_storage(&*client, keys, Some(at_block)) .await .map(|proof| { StorageProof::new(proof.proof.into_iter().map(|b| b.0).collect::>()) }) .map_err(Into::into) }) .await } /// Return `tokenDecimals` property from the set of chain properties. pub async fn token_decimals(&self) -> Result> { self.jsonrpsee_execute(move |client| async move { let system_properties = SubstrateSystemClient::::properties(&*client).await?; Ok(system_properties.get("tokenDecimals").and_then(|v| v.as_u64())) }) .await } /// Return new finality justifications stream. pub async fn subscribe_finality_justifications>( &self, ) -> Result> { let subscription = self .jsonrpsee_execute(move |client| async move { Ok(FC::subscribe_justifications(&client).await?) }) .await?; let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); self.data.read().await.tokio.spawn(Subscription::background_worker( C::NAME.into(), "justification".into(), subscription, sender, )); Ok(Subscription(Mutex::new(receiver))) } // TODO: remove warning after implementing // https://github.com/paritytech/parity-bridges-common/issues/39 #[allow(dead_code)] async fn generate_grandpa_key_ownership_proof( &self, at: HashOf, set_id: sp_consensus_grandpa::SetId, authority_id: sp_consensus_grandpa::AuthorityId, ) -> Result> { self.typed_state_call( SUB_API_GRANDPA_GENERATE_KEY_OWNERSHIP_PROOF.into(), (set_id, authority_id), Some(at), ) .await } /// Execute jsonrpsee future in tokio context. async fn jsonrpsee_execute(&self, make_jsonrpsee_future: MF) -> Result where MF: FnOnce(Arc) -> F + Send + 'static, F: Future> + Send + 'static, T: Send + 'static, { let data = self.data.read().await; let client = data.client.clone(); data.tokio.spawn(make_jsonrpsee_future(client)).await? } /// Returns `true` if version guard can be started. /// /// There's no reason to run version guard when version mode is set to `Auto`. It can /// lead to relay shutdown when chain is upgraded, even though we have explicitly /// said that we don't want to shutdown. pub fn can_start_version_guard(&self) -> bool { !matches!(self.chain_runtime_version, ChainRuntimeVersion::Auto) } } impl Subscription { /// Consumes subscription and returns future statuses stream. pub fn into_stream(self) -> impl futures::Stream { futures::stream::unfold(self, |this| async { let item = this.0.lock().await.next().await.unwrap_or(None); item.map(|i| (i, this)) }) } /// Return next item from the subscription. pub async fn next(&self) -> Result> { let mut receiver = self.0.lock().await; let item = receiver.next().await; Ok(item.unwrap_or(None)) } /// Background worker that is executed in tokio context as `jsonrpsee` requires. async fn background_worker( chain_name: String, item_type: String, mut subscription: jsonrpsee::core::client::Subscription, mut sender: futures::channel::mpsc::Sender>, ) { loop { match subscription.next().await { Some(Ok(item)) => if sender.send(Some(item)).await.is_err() { break }, Some(Err(e)) => { log::trace!( target: "bridge", "{} {} subscription stream has returned '{:?}'. Stream needs to be restarted.", chain_name, item_type, e, ); let _ = sender.send(None).await; break }, None => { log::trace!( target: "bridge", "{} {} subscription stream has returned None. Stream needs to be restarted.", chain_name, item_type, ); let _ = sender.send(None).await; break }, } } } }