Skip to content
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate node client.
use crate::{
chain::{Chain, ChainWithTransactions},
rpc::{
SubstrateAuthorClient, SubstrateChainClient, SubstrateFinalityClient,
SubstrateFrameSystemClient, SubstrateStateClient, SubstrateSystemClient,
},
transaction_stall_timeout, AccountKeyPairOf, ChainWithGrandpa, ConnectionParams, Error, HashOf,
HeaderIdOf, Result, SignParam, TransactionTracker, UnsignedTransaction,
};
use async_std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait;
use bp_runtime::{HeaderIdProvider, StorageDoubleMapKeyProvider, StorageMapKeyProvider};
use codec::{Decode, Encode};
use frame_support::weights::Weight;
use futures::{SinkExt, StreamExt};
use jsonrpsee::{
core::DeserializeOwned,
ws_client::{WsClient as RpcClient, WsClientBuilder as RpcClientBuilder},
};
use num_traits::{Saturating, Zero};
use pallet_transaction_payment::RuntimeDispatchInfo;
use relay_utils::{relay_loop::RECONNECT_DELAY, STALL_TIMEOUT};
use sp_core::{
storage::{StorageData, StorageKey},
Bytes, Hasher, Pair,
};
use sp_runtime::{
traits::Header as HeaderT,
transaction_validity::{TransactionSource, TransactionValidity},
};
use sp_trie::StorageProof;
use sp_version::RuntimeVersion;
use std::future::Future;
const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
const SUB_API_GRANDPA_GENERATE_KEY_OWNERSHIP_PROOF: &str =
"GrandpaApi_generate_key_ownership_proof";
const SUB_API_TXPOOL_VALIDATE_TRANSACTION: &str = "TaggedTransactionQueue_validate_transaction";
const SUB_API_TX_PAYMENT_QUERY_INFO: &str = "TransactionPaymentApi_query_info";
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
/// The difference between best block number and number of its ancestor, that is enough
/// for us to consider that ancestor an "ancient" block with dropped state.
///
/// The relay does not assume that it is connected to the archive node, so it always tries
/// to use the best available chain state. But sometimes it still may use state of some
/// old block. If the state of that block is already dropped, relay will see errors when
/// e.g. it tries to prove something.
///
/// By default Substrate-based nodes are storing state for last 256 blocks. We'll use
/// half of this value.
pub const ANCIENT_BLOCK_THRESHOLD: u32 = 128;
/// Returns `true` if we think that the state is already discarded for given block.
pub fn is_ancient_block<N: From<u32> + PartialOrd + Saturating>(block: N, best: N) -> bool {
best.saturating_sub(block) >= N::from(ANCIENT_BLOCK_THRESHOLD)
}
/// Opaque justifications subscription type.
pub struct Subscription<T>(pub(crate) Mutex<futures::channel::mpsc::Receiver<Option<T>>>);
/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
/// A simple runtime version. It only includes the `spec_version` and `transaction_version`.
#[derive(Copy, Clone, Debug)]
pub struct SimpleRuntimeVersion {
/// Version of the runtime specification.
pub spec_version: u32,
/// All existing dispatches are fully compatible when this number doesn't change.
pub transaction_version: u32,
}
impl SimpleRuntimeVersion {
/// Create a new instance of `SimpleRuntimeVersion` from a `RuntimeVersion`.
pub const fn from_runtime_version(runtime_version: &RuntimeVersion) -> Self {
Self {
spec_version: runtime_version.spec_version,
transaction_version: runtime_version.transaction_version,
}
}
}
/// Chain runtime version in client
#[derive(Clone, Debug)]
pub enum ChainRuntimeVersion {
/// Auto query from chain.
Auto,
/// Custom runtime version, defined by user.
Custom(SimpleRuntimeVersion),
}
/// Substrate client type.
///
/// Cloning `Client` is a cheap operation that only clones internal references. Different
/// clones of the same client are guaranteed to use the same references.
pub struct Client<C: Chain> {
// Lock order: `submit_signed_extrinsic_lock`, `data`
/// Client connection params.
params: Arc<ConnectionParams>,
/// Saved chain runtime version.
chain_runtime_version: ChainRuntimeVersion,
/// If several tasks are submitting their transactions simultaneously using
/// `submit_signed_extrinsic` method, they may get the same transaction nonce. So one of
/// transactions will be rejected from the pool. This lock is here to prevent situations like
/// that.
submit_signed_extrinsic_lock: Arc<Mutex<()>>,
/// Genesis block hash.
genesis_hash: HashOf<C>,
/// Shared dynamic data.
data: Arc<RwLock<ClientData>>,
}
/// Client data, shared by all `Client` clones.
struct ClientData {
/// Tokio runtime handle.
tokio: Arc<tokio::runtime::Runtime>,
/// Substrate RPC client.
client: Arc<RpcClient>,
}
/// Already encoded value.
struct PreEncoded(Vec<u8>);
impl Encode for PreEncoded {
fn encode(&self) -> Vec<u8> {
self.0.clone()
}
}
#[async_trait]
impl<C: Chain> relay_utils::relay_loop::Client for Client<C> {
type Error = Error;
async fn reconnect(&mut self) -> Result<()> {
let mut data = self.data.write().await;
let (tokio, client) = Self::build_client(&self.params).await?;
data.tokio = tokio;
data.client = client;
Ok(())
}
}
impl<C: Chain> Clone for Client<C> {
fn clone(&self) -> Self {
Client {
params: self.params.clone(),
chain_runtime_version: self.chain_runtime_version.clone(),
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
genesis_hash: self.genesis_hash,
data: self.data.clone(),
}
}
}
impl<C: Chain> std::fmt::Debug for Client<C> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("Client").field("genesis_hash", &self.genesis_hash).finish()
}
}
impl<C: Chain> Client<C> {
/// Returns client that is able to call RPCs on Substrate node over websocket connection.
///
/// This function will keep connecting to given Substrate node until connection is established
/// and is functional. If attempt fail, it will wait for `RECONNECT_DELAY` and retry again.
pub async fn new(params: ConnectionParams) -> Self {
let params = Arc::new(params);
loop {
match Self::try_connect(params.clone()).await {
Ok(client) => return client,
Err(error) => log::error!(
target: "bridge",
"Failed to connect to {} node: {:?}. Going to retry in {}s",
C::NAME,
error,
RECONNECT_DELAY.as_secs(),
),
}
async_std::task::sleep(RECONNECT_DELAY).await;
}
}
/// Try to connect to Substrate node over websocket. Returns Substrate RPC client if connection
/// has been established or error otherwise.
pub async fn try_connect(params: Arc<ConnectionParams>) -> Result<Self> {
let (tokio, client) = Self::build_client(&params).await?;
let number: C::BlockNumber = Zero::zero();
let genesis_hash_client = client.clone();
let genesis_hash = tokio
.spawn(async move {
SubstrateChainClient::<C>::block_hash(&*genesis_hash_client, Some(number)).await
})
.await??;
let chain_runtime_version = params.chain_runtime_version.clone();
Ok(Self {
params,
chain_runtime_version,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
genesis_hash,
data: Arc::new(RwLock::new(ClientData { tokio, client })),
})
}
/// Build client to use in connection.
async fn build_client(
params: &ConnectionParams,
) -> Result<(Arc<tokio::runtime::Runtime>, Arc<RpcClient>)> {
let tokio = tokio::runtime::Runtime::new()?;
let uri = format!(
"{}://{}:{}",
if params.secure { "wss" } else { "ws" },
params.host,
params.port,
);
log::info!(target: "bridge", "Connecting to {} node at {}", C::NAME, uri);
let client = tokio
.spawn(async move {
RpcClientBuilder::default()
.max_buffer_capacity_per_subscription(MAX_SUBSCRIPTION_CAPACITY)
.build(&uri)
.await
})
.await??;
Ok((Arc::new(tokio), Arc::new(client)))
}
}
impl<C: Chain> Client<C> {
/// Return simple runtime version, only include `spec_version` and `transaction_version`.
pub async fn simple_runtime_version(&self) -> Result<SimpleRuntimeVersion> {
Ok(match &self.chain_runtime_version {
ChainRuntimeVersion::Auto => {
let runtime_version = self.runtime_version().await?;
SimpleRuntimeVersion::from_runtime_version(&runtime_version)
},
ChainRuntimeVersion::Custom(version) => *version,
})
}
/// Returns true if client is connected to at least one peer and is in synced state.
pub async fn ensure_synced(&self) -> Result<()> {
self.jsonrpsee_execute(|client| async move {
let health = SubstrateSystemClient::<C>::health(&*client).await?;
let is_synced = !health.is_syncing && (!health.should_have_peers || health.peers > 0);
if is_synced {
Ok(())
} else {
Err(Error::ClientNotSynced(health))
}
})
.await
}
/// Return hash of the genesis block.
pub fn genesis_hash(&self) -> &C::Hash {
&self.genesis_hash
}
/// Return hash of the best finalized block.
pub async fn best_finalized_header_hash(&self) -> Result<C::Hash> {
self.jsonrpsee_execute(|client| async move {
Ok(SubstrateChainClient::<C>::finalized_head(&*client).await?)
})
.await
.map_err(|e| Error::FailedToReadBestFinalizedHeaderHash {
chain: C::NAME.into(),
error: e.boxed(),
})
}
/// Return number of the best finalized block.
pub async fn best_finalized_header_number(&self) -> Result<C::BlockNumber> {
Ok(*self.best_finalized_header().await?.number())
}
/// Return header of the best finalized block.
pub async fn best_finalized_header(&self) -> Result<C::Header> {
self.header_by_hash(self.best_finalized_header_hash().await?).await
}
/// Returns the best Substrate header.
pub async fn best_header(&self) -> Result<C::Header>
where
C::Header: DeserializeOwned,
{
self.jsonrpsee_execute(|client| async move {
Ok(SubstrateChainClient::<C>::header(&*client, None).await?)
})
.await
.map_err(|e| Error::FailedToReadBestHeader { chain: C::NAME.into(), error: e.boxed() })
}
/// Get a Substrate block from its hash.
pub async fn get_block(&self, block_hash: Option<C::Hash>) -> Result<C::SignedBlock> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateChainClient::<C>::block(&*client, block_hash).await?)
})
.await
}
/// Get a Substrate header by its hash.
pub async fn header_by_hash(&self, block_hash: C::Hash) -> Result<C::Header>
where
C::Header: DeserializeOwned,
{
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateChainClient::<C>::header(&*client, Some(block_hash)).await?)
})
.await
.map_err(|e| Error::FailedToReadHeaderByHash {
chain: C::NAME.into(),
hash: format!("{block_hash}"),
error: e.boxed(),
})
}
/// Get a Substrate block hash by its number.
pub async fn block_hash_by_number(&self, number: C::BlockNumber) -> Result<C::Hash> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateChainClient::<C>::block_hash(&*client, Some(number)).await?)
})
.await
}
/// Get a Substrate header by its number.
pub async fn header_by_number(&self, block_number: C::BlockNumber) -> Result<C::Header>
where
C::Header: DeserializeOwned,
{
let block_hash = Self::block_hash_by_number(self, block_number).await?;
let header_by_hash = Self::header_by_hash(self, block_hash).await?;
Ok(header_by_hash)
}
/// Return runtime version.
pub async fn runtime_version(&self) -> Result<RuntimeVersion> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateStateClient::<C>::runtime_version(&*client).await?)
})
.await
}
/// Read value from runtime storage.
pub async fn storage_value<T: Send + Decode + 'static>(
&self,
storage_key: StorageKey,
block_hash: Option<C::Hash>,
) -> Result<Option<T>> {
self.raw_storage_value(storage_key, block_hash)
.await?
.map(|encoded_value| {
T::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed)
})
.transpose()
}
/// Read `MapStorage` value from runtime storage.
pub async fn storage_map_value<T: StorageMapKeyProvider>(
&self,
pallet_prefix: &str,
key: &T::Key,
block_hash: Option<C::Hash>,
) -> Result<Option<T::Value>> {
let storage_key = T::final_key(pallet_prefix, key);
self.raw_storage_value(storage_key, block_hash)
.await?
.map(|encoded_value| {
T::Value::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed)
})
.transpose()
}
/// Read `DoubleMapStorage` value from runtime storage.
pub async fn storage_double_map_value<T: StorageDoubleMapKeyProvider>(
&self,
pallet_prefix: &str,
key1: &T::Key1,
key2: &T::Key2,
block_hash: Option<C::Hash>,
) -> Result<Option<T::Value>> {
let storage_key = T::final_key(pallet_prefix, key1, key2);
self.raw_storage_value(storage_key, block_hash)
.await?
.map(|encoded_value| {
T::Value::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed)
})
.transpose()
}
/// Read raw value from runtime storage.
pub async fn raw_storage_value(
&self,
storage_key: StorageKey,
block_hash: Option<C::Hash>,
) -> Result<Option<StorageData>> {
let cloned_storage_key = storage_key.clone();
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateStateClient::<C>::storage(&*client, storage_key.clone(), block_hash)
.await?)
})
.await
.map_err(|e| Error::FailedToReadRuntimeStorageValue {
chain: C::NAME.into(),
key: cloned_storage_key,
error: e.boxed(),
})
}
/// Get the nonce of the given Substrate account.
///
/// Note: It's the caller's responsibility to make sure `account` is a valid SS58 address.
pub async fn next_account_index(&self, account: C::AccountId) -> Result<C::Nonce> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateFrameSystemClient::<C>::account_next_index(&*client, account).await?)
})
.await
}
/// Submit unsigned extrinsic for inclusion in a block.
///
/// Note: The given transaction needs to be SCALE encoded beforehand.
pub async fn submit_unsigned_extrinsic(&self, transaction: Bytes) -> Result<C::Hash> {
// one last check that the transaction is valid. Most of checks happen in the relay loop and
// it is the "final" check before submission.
let best_header_hash = self.best_header().await?.hash();
self.validate_transaction(best_header_hash, PreEncoded(transaction.0.clone()))
.await
.map_err(|e| {
log::error!(target: "bridge", "Pre-submit {} transaction validation failed: {:?}", C::NAME, e);
e
})??;
self.jsonrpsee_execute(move |client| async move {
let tx_hash = SubstrateAuthorClient::<C>::submit_extrinsic(&*client, transaction)
.await
.map_err(|e| {
log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
e
})?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
Ok(tx_hash)
})
.await
}
async fn build_sign_params(&self, signer: AccountKeyPairOf<C>) -> Result<SignParam<C>>
where
C: ChainWithTransactions,
{
let runtime_version = self.simple_runtime_version().await?;
Ok(SignParam::<C> {
spec_version: runtime_version.spec_version,
transaction_version: runtime_version.transaction_version,
genesis_hash: self.genesis_hash,
signer,
})
}
/// Submit an extrinsic signed by given account.
///
/// All calls of this method are synchronized, so there can't be more than one active
/// `submit_signed_extrinsic()` call. This guarantees that no nonces collision may happen
/// if all client instances are clones of the same initial `Client`.
///
/// Note: The given transaction needs to be SCALE encoded beforehand.
pub async fn submit_signed_extrinsic(
&self,
signer: &AccountKeyPairOf<C>,
prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, C::Nonce) -> Result<UnsignedTransaction<C>>
+ Send
+ 'static,
) -> Result<C::Hash>
where
C: ChainWithTransactions,
C::AccountId: From<<C::AccountKeyPair as Pair>::Public>,
{
let _guard = self.submit_signed_extrinsic_lock.lock().await;
let transaction_nonce = self.next_account_index(signer.public().into()).await?;
let best_header = self.best_header().await?;
let signing_data = self.build_sign_params(signer.clone()).await?;
// By using parent of best block here, we are protecing again best-block reorganizations.
// E.g. transaction may have been submitted when the best block was `A[num=100]`. Then it
// has been changed to `B[num=100]`. Hash of `A` has been included into transaction
// signature payload. So when signature will be checked, the check will fail and transaction
// will be dropped from the pool.
let best_header_id = best_header.parent_id().unwrap_or_else(|| best_header.id());
let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?;
let signed_extrinsic = C::sign_transaction(signing_data, extrinsic)?.encode();
// one last check that the transaction is valid. Most of checks happen in the relay loop and
// it is the "final" check before submission.
self.validate_transaction(best_header_id.1, PreEncoded(signed_extrinsic.clone()))
.await
.map_err(|e| {
log::error!(target: "bridge", "Pre-submit {} transaction validation failed: {:?}", C::NAME, e);
e
})??;
self.jsonrpsee_execute(move |client| async move {
let tx_hash =
SubstrateAuthorClient::<C>::submit_extrinsic(&*client, Bytes(signed_extrinsic))
.await
.map_err(|e| {
log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
e
})?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
Ok(tx_hash)
})
.await
}
/// Does exactly the same as `submit_signed_extrinsic`, but keeps watching for extrinsic status
/// after submission.
pub async fn submit_and_watch_signed_extrinsic(
&self,
signer: &AccountKeyPairOf<C>,
prepare_extrinsic: impl FnOnce(HeaderIdOf<C>, C::Nonce) -> Result<UnsignedTransaction<C>>
+ Send
+ 'static,
) -> Result<TransactionTracker<C, Self>>
where
C: ChainWithTransactions,
C::AccountId: From<<C::AccountKeyPair as Pair>::Public>,
{
let self_clone = self.clone();
let signing_data = self.build_sign_params(signer.clone()).await?;
let _guard = self.submit_signed_extrinsic_lock.lock().await;
let transaction_nonce = self.next_account_index(signer.public().into()).await?;
let best_header = self.best_header().await?;
let best_header_id = best_header.id();
let extrinsic = prepare_extrinsic(best_header_id, transaction_nonce)?;
let stall_timeout = transaction_stall_timeout(
extrinsic.era.mortality_period(),
C::AVERAGE_BLOCK_INTERVAL,
STALL_TIMEOUT,
);
let signed_extrinsic = C::sign_transaction(signing_data, extrinsic)?.encode();
// one last check that the transaction is valid. Most of checks happen in the relay loop and
// it is the "final" check before submission.
self.validate_transaction(best_header_id.1, PreEncoded(signed_extrinsic.clone()))
.await
.map_err(|e| {
log::error!(target: "bridge", "Pre-submit {} transaction validation failed: {:?}", C::NAME, e);
e
})??;
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
let (tracker, subscription) = self
.jsonrpsee_execute(move |client| async move {
let tx_hash = C::Hasher::hash(&signed_extrinsic);
let subscription = SubstrateAuthorClient::<C>::submit_and_watch_extrinsic(
&*client,
Bytes(signed_extrinsic),
)
.await
.map_err(|e| {
log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
e
})?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
let tracker = TransactionTracker::new(
self_clone,
stall_timeout,
tx_hash,
Subscription(Mutex::new(receiver)),
);
Ok((tracker, subscription))
})
.await?;
self.data.read().await.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"extrinsic".into(),
subscription,
sender,
));
Ok(tracker)
}
/// Returns pending extrinsics from transaction pool.
pub async fn pending_extrinsics(&self) -> Result<Vec<Bytes>> {
self.jsonrpsee_execute(move |client| async move {
Ok(SubstrateAuthorClient::<C>::pending_extrinsics(&*client).await?)
})
.await
}
/// Validate transaction at given block state.
pub async fn validate_transaction<SignedTransaction: Encode + Send + 'static>(
&self,
at_block: C::Hash,
transaction: SignedTransaction,
) -> Result<TransactionValidity> {
self.jsonrpsee_execute(move |client| async move {
let call = SUB_API_TXPOOL_VALIDATE_TRANSACTION.to_string();
let data = Bytes((TransactionSource::External, transaction, at_block).encode());
let encoded_response =
SubstrateStateClient::<C>::call(&*client, call, data, Some(at_block)).await?;
let validity = TransactionValidity::decode(&mut &encoded_response.0[..])
.map_err(Error::ResponseParseFailed)?;
Ok(validity)
})
.await
}
/// Returns weight of the given transaction.
pub async fn extimate_extrinsic_weight<SignedTransaction: Encode + Send + 'static>(
&self,
transaction: SignedTransaction,
) -> Result<Weight> {
self.jsonrpsee_execute(move |client| async move {
let transaction_len = transaction.encoded_size() as u32;
let call = SUB_API_TX_PAYMENT_QUERY_INFO.to_string();
let data = Bytes((transaction, transaction_len).encode());
let encoded_response =
SubstrateStateClient::<C>::call(&*client, call, data, None).await?;
let dispatch_info =
RuntimeDispatchInfo::<C::Balance>::decode(&mut &encoded_response.0[..])
.map_err(Error::ResponseParseFailed)?;
Ok(dispatch_info.weight)
})
.await
}
/// Get the GRANDPA authority set at given block.
pub async fn grandpa_authorities_set(
&self,
block: C::Hash,
) -> Result<OpaqueGrandpaAuthoritiesSet> {
self.jsonrpsee_execute(move |client| async move {
let call = SUB_API_GRANDPA_AUTHORITIES.to_string();
let data = Bytes(Vec::new());
let encoded_response =
SubstrateStateClient::<C>::call(&*client, call, data, Some(block)).await?;
let authority_list = encoded_response.0;
Ok(authority_list)
})
.await
}
/// Execute runtime call at given block, provided the input and output types.
/// It also performs the input encode and output decode.
pub async fn typed_state_call<Input: codec::Encode, Output: codec::Decode>(
&self,
method_name: String,
input: Input,
at_block: Option<C::Hash>,
) -> Result<Output> {
let encoded_output = self
.state_call(method_name.clone(), Bytes(input.encode()), at_block)
.await
.map_err(|e| Error::ErrorExecutingRuntimeCall {
chain: C::NAME.into(),
method: method_name,
error: e.boxed(),
})?;
Output::decode(&mut &encoded_output.0[..]).map_err(Error::ResponseParseFailed)
}
/// Execute runtime call at given block.
pub async fn state_call(
&self,
method: String,
data: Bytes,
at_block: Option<C::Hash>,
) -> Result<Bytes> {
self.jsonrpsee_execute(move |client| async move {
SubstrateStateClient::<C>::call(&*client, method, data, at_block)
.await
.map_err(Into::into)
})
.await
}
/// Returns storage proof of given storage keys.
pub async fn prove_storage(
&self,
keys: Vec<StorageKey>,
at_block: C::Hash,
) -> Result<StorageProof> {
self.jsonrpsee_execute(move |client| async move {
SubstrateStateClient::<C>::prove_storage(&*client, keys, Some(at_block))
.await
.map(|proof| {
StorageProof::new(proof.proof.into_iter().map(|b| b.0).collect::<Vec<_>>())
})
.map_err(Into::into)
})
.await
}
/// Return `tokenDecimals` property from the set of chain properties.
pub async fn token_decimals(&self) -> Result<Option<u64>> {
self.jsonrpsee_execute(move |client| async move {
let system_properties = SubstrateSystemClient::<C>::properties(&*client).await?;
Ok(system_properties.get("tokenDecimals").and_then(|v| v.as_u64()))
})
.await
}
/// Return new finality justifications stream.
pub async fn subscribe_finality_justifications<FC: SubstrateFinalityClient<C>>(
&self,
) -> Result<Subscription<Bytes>> {
let subscription = self
.jsonrpsee_execute(move |client| async move {
Ok(FC::subscribe_justifications(&client).await?)
})
.await?;
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.data.read().await.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"justification".into(),
subscription,
sender,
));
Ok(Subscription(Mutex::new(receiver)))
}
/// Generates a proof of key ownership for the given authority in the given set.
pub async fn generate_grandpa_key_ownership_proof(
&self,
at: HashOf<C>,
set_id: sp_consensus_grandpa::SetId,
authority_id: sp_consensus_grandpa::AuthorityId,
) -> Result<Option<sp_consensus_grandpa::OpaqueKeyOwnershipProof>>
where
C: ChainWithGrandpa,
{
self.typed_state_call(
SUB_API_GRANDPA_GENERATE_KEY_OWNERSHIP_PROOF.into(),
(set_id, authority_id),
Some(at),
)
.await
}
/// Execute jsonrpsee future in tokio context.
async fn jsonrpsee_execute<MF, F, T>(&self, make_jsonrpsee_future: MF) -> Result<T>
where
MF: FnOnce(Arc<RpcClient>) -> F + Send + 'static,
F: Future<Output = Result<T>> + Send + 'static,
T: Send + 'static,
{
let data = self.data.read().await;
let client = data.client.clone();
data.tokio.spawn(make_jsonrpsee_future(client)).await?
}
/// Returns `true` if version guard can be started.
///
/// There's no reason to run version guard when version mode is set to `Auto`. It can
/// lead to relay shutdown when chain is upgraded, even though we have explicitly
/// said that we don't want to shutdown.
pub fn can_start_version_guard(&self) -> bool {
!matches!(self.chain_runtime_version, ChainRuntimeVersion::Auto)
}
}
impl<T: DeserializeOwned> Subscription<T> {
/// Consumes subscription and returns future statuses stream.
pub fn into_stream(self) -> impl futures::Stream<Item = T> {
futures::stream::unfold(self, |this| async {
let item = this.0.lock().await.next().await.unwrap_or(None);
item.map(|i| (i, this))
})
}
/// Return next item from the subscription.
pub async fn next(&self) -> Result<Option<T>> {
let mut receiver = self.0.lock().await;
let item = receiver.next().await;
Ok(item.unwrap_or(None))
}
/// Background worker that is executed in tokio context as `jsonrpsee` requires.
async fn background_worker(
chain_name: String,
item_type: String,
mut subscription: jsonrpsee::core::client::Subscription<T>,
mut sender: futures::channel::mpsc::Sender<Option<T>>,
) {
loop {
match subscription.next().await {
Some(Ok(item)) =>
if sender.send(Some(item)).await.is_err() {
break
},
Some(Err(e)) => {
log::trace!(
target: "bridge",
"{} {} subscription stream has returned '{:?}'. Stream needs to be restarted.",
chain_name,
item_type,
e,
);
let _ = sender.send(None).await;
break
},
None => {
log::trace!(
target: "bridge",
"{} {} subscription stream has returned None. Stream needs to be restarted.",
chain_name,
item_type,
);
let _ = sender.send(None).await;
break
},
}
}
}
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Substrate node RPC errors.
use bp_polkadot_core::parachains::ParaId;
use jsonrpsee::core::Error as RpcError;
use relay_utils::MaybeConnectionError;
use sc_rpc_api::system::Health;
use sp_core::storage::StorageKey;
use sp_runtime::transaction_validity::TransactionValidityError;
use thiserror::Error;
/// Result type used by Substrate client.
pub type Result<T> = std::result::Result<T, Error>;
/// Errors that can occur only when interacting with
/// a Substrate node through RPC.
#[derive(Error, Debug)]
pub enum Error {
/// IO error.
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
/// An error that can occur when making a request to
/// an JSON-RPC server.
#[error("RPC error: {0}")]
RpcError(#[from] RpcError),
/// The response from the server could not be SCALE decoded.
#[error("Response parse failed: {0}")]
ResponseParseFailed(#[from] codec::Error),
/// Account does not exist on the chain.
#[error("Account does not exist on the chain.")]
AccountDoesNotExist,
/// Runtime storage is missing some mandatory value.
#[error("Mandatory storage value is missing from the runtime storage.")]
MissingMandatoryStorageValue,
/// Required parachain head is not present at the relay chain.
#[error("Parachain {0:?} head {1} is missing from the relay chain storage.")]
MissingRequiredParachainHead(ParaId, u64),
/// Failed to find finality proof for the given header.
#[error("Failed to find finality proof for header {0}.")]
FinalityProofNotFound(u64),
/// The client we're connected to is not synced, so we can't rely on its state.
#[error("Substrate client is not synced {0}.")]
ClientNotSynced(Health),
/// Failed to read best finalized header hash from given chain.
#[error("Failed to read best finalized header hash of {chain}: {error:?}.")]
FailedToReadBestFinalizedHeaderHash {
/// Name of the chain where the error has happened.
chain: String,
/// Underlying error.
error: Box<Error>,
},
/// Failed to read best finalized header from given chain.
#[error("Failed to read best header of {chain}: {error:?}.")]
FailedToReadBestHeader {
/// Name of the chain where the error has happened.
chain: String,
/// Underlying error.
error: Box<Error>,
},
/// Failed to read header by hash from given chain.
#[error("Failed to read header {hash} of {chain}: {error:?}.")]
FailedToReadHeaderByHash {
/// Name of the chain where the error has happened.
chain: String,
/// Hash of the header we've tried to read.
hash: String,
/// Underlying error.
error: Box<Error>,
},
/// Failed to execute runtime call at given chain.
#[error("Failed to execute runtime call {method} at {chain}: {error:?}.")]
ErrorExecutingRuntimeCall {
/// Name of the chain where the error has happened.
chain: String,
/// Runtime method name.
method: String,
/// Underlying error.
error: Box<Error>,
},
/// Failed to read sotrage value at given chain.
#[error("Failed to read storage value {key:?} at {chain}: {error:?}.")]
FailedToReadRuntimeStorageValue {
/// Name of the chain where the error has happened.
chain: String,
/// Runtime storage key
key: StorageKey,
/// Underlying error.
error: Box<Error>,
},
/// The bridge pallet is halted and all transactions will be rejected.
#[error("Bridge pallet is halted.")]
BridgePalletIsHalted,
/// The bridge pallet is not yet initialized and all transactions will be rejected.
#[error("Bridge pallet is not initialized.")]
BridgePalletIsNotInitialized,
/// There's no best head of the parachain at the `pallet-bridge-parachains` at the target side.
#[error("No head of the ParaId({0}) at the bridge parachains pallet at {1}.")]
NoParachainHeadAtTarget(u32, String),
/// An error has happened when we have tried to parse storage proof.
#[error("Error when parsing storage proof: {0:?}.")]
StorageProofError(bp_runtime::StorageProofError),
/// The Substrate transaction is invalid.
#[error("Substrate transaction is invalid: {0:?}")]
TransactionInvalid(#[from] TransactionValidityError),
/// Custom logic error.
#[error("{0}")]
Custom(String),
}
impl From<tokio::task::JoinError> for Error {
fn from(error: tokio::task::JoinError) -> Self {
Error::Custom(format!("Failed to wait tokio task: {error}"))
}
}
impl Error {
/// Box the error.
pub fn boxed(self) -> Box<Self> {
Box::new(self)
}
}
impl MaybeConnectionError for Error {
fn is_connection_error(&self) -> bool {
match *self {
Error::RpcError(RpcError::Transport(_)) |
Error::RpcError(RpcError::RestartNeeded(_)) |
Error::ClientNotSynced(_) => true,
Error::FailedToReadBestFinalizedHeaderHash { ref error, .. } =>
error.is_connection_error(),
Error::FailedToReadBestHeader { ref error, .. } => error.is_connection_error(),
Error::FailedToReadHeaderByHash { ref error, .. } => error.is_connection_error(),
Error::ErrorExecutingRuntimeCall { ref error, .. } => error.is_connection_error(),
Error::FailedToReadRuntimeStorageValue { ref error, .. } => error.is_connection_error(),
_ => false,
}
}
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Pallet provides a set of guard functions that are running in background threads
//! and are aborting process if some condition fails.
use crate::{error::Error, Chain, Client};
use async_trait::async_trait;
use sp_version::RuntimeVersion;
use std::{
fmt::Display,
time::{Duration, Instant},
};
/// Guards environment.
#[async_trait]
pub trait Environment<C>: Send + Sync + 'static {
/// Error type.
type Error: Display + Send + Sync + 'static;
/// Return current runtime version.
async fn runtime_version(&mut self) -> Result<RuntimeVersion, Self::Error>;
/// Return current time.
fn now(&self) -> Instant {
Instant::now()
}
/// Sleep given amount of time.
async fn sleep(&mut self, duration: Duration) {
async_std::task::sleep(duration).await
}
/// Abort current process. Called when guard condition check fails.
async fn abort(&mut self) {
std::process::abort();
}
}
/// Abort when runtime spec version is different from specified.
pub fn abort_on_spec_version_change<C: Chain>(
mut env: impl Environment<C>,
expected_spec_version: u32,
) {
async_std::task::spawn(async move {
log::info!(
target: "bridge-guard",
"Starting spec_version guard for {}. Expected spec_version: {}",
C::NAME,
expected_spec_version,
);
loop {
let actual_spec_version = env.runtime_version().await;
match actual_spec_version {
Ok(version) if version.spec_version == expected_spec_version => (),
Ok(version) => {
log::error!(
target: "bridge-guard",
"{} runtime spec version has changed from {} to {}. Aborting relay",
C::NAME,
expected_spec_version,
version.spec_version,
);
env.abort().await;
},
Err(error) => log::warn!(
target: "bridge-guard",
"Failed to read {} runtime version: {}. Relay may need to be stopped manually",
C::NAME,
error,
),
}
env.sleep(conditions_check_delay::<C>()).await;
}
});
}
/// Delay between conditions check.
fn conditions_check_delay<C: Chain>() -> Duration {
C::AVERAGE_BLOCK_INTERVAL * (10 + rand::random::<u32>() % 10)
}
#[async_trait]
impl<C: Chain> Environment<C> for Client<C> {
type Error = Error;
async fn runtime_version(&mut self) -> Result<RuntimeVersion, Self::Error> {
Client::<C>::runtime_version(self).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_chain::TestChain;
use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
future::FutureExt,
stream::StreamExt,
SinkExt,
};
struct TestEnvironment {
runtime_version_rx: UnboundedReceiver<RuntimeVersion>,
slept_tx: UnboundedSender<()>,
aborted_tx: UnboundedSender<()>,
}
#[async_trait]
impl Environment<TestChain> for TestEnvironment {
type Error = Error;
async fn runtime_version(&mut self) -> Result<RuntimeVersion, Self::Error> {
Ok(self.runtime_version_rx.next().await.unwrap_or_default())
}
async fn sleep(&mut self, _duration: Duration) {
let _ = self.slept_tx.send(()).await;
}
async fn abort(&mut self) {
let _ = self.aborted_tx.send(()).await;
// simulate process abort :)
async_std::task::sleep(Duration::from_secs(60)).await;
}
}
#[test]
fn aborts_when_spec_version_is_changed() {
async_std::task::block_on(async {
let (
(mut runtime_version_tx, runtime_version_rx),
(slept_tx, mut slept_rx),
(aborted_tx, mut aborted_rx),
) = (unbounded(), unbounded(), unbounded());
abort_on_spec_version_change(
TestEnvironment { runtime_version_rx, slept_tx, aborted_tx },
0,
);
// client responds with wrong version
runtime_version_tx
.send(RuntimeVersion { spec_version: 42, ..Default::default() })
.await
.unwrap();
// then the `abort` function is called
aborted_rx.next().await;
// and we do not reach the `sleep` function call
assert!(slept_rx.next().now_or_never().is_none());
});
}
#[test]
fn does_not_aborts_when_spec_version_is_unchanged() {
async_std::task::block_on(async {
let (
(mut runtime_version_tx, runtime_version_rx),
(slept_tx, mut slept_rx),
(aborted_tx, mut aborted_rx),
) = (unbounded(), unbounded(), unbounded());
abort_on_spec_version_change(
TestEnvironment { runtime_version_rx, slept_tx, aborted_tx },
42,
);
// client responds with the same version
runtime_version_tx
.send(RuntimeVersion { spec_version: 42, ..Default::default() })
.await
.unwrap();
// then the `sleep` function is called
slept_rx.next().await;
// and the `abort` function is not called
assert!(aborted_rx.next().now_or_never().is_none());
});
}
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Tools to interact with Substrate node using RPC methods.
#![warn(missing_docs)]
mod chain;
mod client;
mod error;
mod rpc;
mod sync_header;
mod transaction_tracker;
pub mod calls;
pub mod guard;
pub mod metrics;
pub mod test_chain;
use std::time::Duration;
pub use crate::{
chain::{
AccountKeyPairOf, BlockWithJustification, CallOf, Chain, ChainWithBalances,
ChainWithGrandpa, ChainWithMessages, ChainWithTransactions, ChainWithUtilityPallet,
FullRuntimeUtilityPallet, MockedRuntimeUtilityPallet, Parachain, RelayChain, SignParam,
TransactionStatusOf, UnsignedTransaction, UtilityPallet,
},
client::{
is_ancient_block, ChainRuntimeVersion, Client, OpaqueGrandpaAuthoritiesSet,
SimpleRuntimeVersion, Subscription, ANCIENT_BLOCK_THRESHOLD,
},
error::{Error, Result},
rpc::{SubstrateBeefyFinalityClient, SubstrateFinalityClient, SubstrateGrandpaFinalityClient},
sync_header::SyncHeader,
transaction_tracker::TransactionTracker,
};
pub use bp_runtime::{
AccountIdOf, AccountPublicOf, BalanceOf, BlockNumberOf, Chain as ChainBase, HashOf, HeaderIdOf,
HeaderOf, NonceOf, Parachain as ParachainBase, SignatureOf, TransactionEra, TransactionEraOf,
UnderlyingChainProvider,
};
/// Substrate-over-websocket connection params.
#[derive(Debug, Clone)]
pub struct ConnectionParams {
/// Websocket server host name.
pub host: String,
/// Websocket server TCP port.
pub port: u16,
/// Use secure websocket connection.
pub secure: bool,
/// Defined chain runtime version
pub chain_runtime_version: ChainRuntimeVersion,
}
impl Default for ConnectionParams {
fn default() -> Self {
ConnectionParams {
host: "localhost".into(),
port: 9944,
secure: false,
chain_runtime_version: ChainRuntimeVersion::Auto,
}
}
}
/// Returns stall timeout for relay loop.
///
/// Relay considers himself stalled if he has submitted transaction to the node, but it has not
/// been mined for this period.
pub fn transaction_stall_timeout(
mortality_period: Option<u32>,
average_block_interval: Duration,
default_stall_timeout: Duration,
) -> Duration {
// 1 extra block for transaction to reach the pool && 1 for relayer to awake after it is mined
mortality_period
.map(|mortality_period| average_block_interval.saturating_mul(mortality_period + 1 + 1))
.unwrap_or(default_stall_timeout)
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::{chain::Chain, client::Client, Error as SubstrateError};
use async_std::sync::{Arc, RwLock};
use async_trait::async_trait;
use codec::Decode;
use num_traits::One;
use relay_utils::metrics::{
metric_name, register, F64SharedRef, Gauge, Metric, PrometheusError, Registry,
StandaloneMetric, F64,
};
use sp_core::storage::{StorageData, StorageKey};
use sp_runtime::{traits::UniqueSaturatedInto, FixedPointNumber, FixedU128};
use std::{marker::PhantomData, time::Duration};
/// Storage value update interval (in blocks).
const UPDATE_INTERVAL_IN_BLOCKS: u32 = 5;
/// Fied-point storage value and the way it is decoded from the raw storage value.
pub trait FloatStorageValue: 'static + Clone + Send + Sync {
/// Type of the value.
type Value: FixedPointNumber;
/// Try to decode value from the raw storage value.
fn decode(
&self,
maybe_raw_value: Option<StorageData>,
) -> Result<Option<Self::Value>, SubstrateError>;
}
/// Implementation of `FloatStorageValue` that expects encoded `FixedU128` value and returns `1` if
/// value is missing from the storage.
#[derive(Clone, Debug, Default)]
pub struct FixedU128OrOne;
impl FloatStorageValue for FixedU128OrOne {
type Value = FixedU128;
fn decode(
&self,
maybe_raw_value: Option<StorageData>,
) -> Result<Option<Self::Value>, SubstrateError> {
maybe_raw_value
.map(|raw_value| {
FixedU128::decode(&mut &raw_value.0[..])
.map_err(SubstrateError::ResponseParseFailed)
.map(Some)
})
.unwrap_or_else(|| Ok(Some(FixedU128::one())))
}
}
/// Metric that represents fixed-point runtime storage value as float gauge.
#[derive(Clone, Debug)]
pub struct FloatStorageValueMetric<C: Chain, V: FloatStorageValue> {
value_converter: V,
client: Client<C>,
storage_key: StorageKey,
metric: Gauge<F64>,
shared_value_ref: F64SharedRef,
_phantom: PhantomData<V>,
}
impl<C: Chain, V: FloatStorageValue> FloatStorageValueMetric<C, V> {
/// Create new metric.
pub fn new(
value_converter: V,
client: Client<C>,
storage_key: StorageKey,
name: String,
help: String,
) -> Result<Self, PrometheusError> {
let shared_value_ref = Arc::new(RwLock::new(None));
Ok(FloatStorageValueMetric {
value_converter,
client,
storage_key,
metric: Gauge::new(metric_name(None, &name), help)?,
shared_value_ref,
_phantom: Default::default(),
})
}
/// Get shared reference to metric value.
pub fn shared_value_ref(&self) -> F64SharedRef {
self.shared_value_ref.clone()
}
}
impl<C: Chain, V: FloatStorageValue> Metric for FloatStorageValueMetric<C, V> {
fn register(&self, registry: &Registry) -> Result<(), PrometheusError> {
register(self.metric.clone(), registry).map(drop)
}
}
#[async_trait]
impl<C: Chain, V: FloatStorageValue> StandaloneMetric for FloatStorageValueMetric<C, V> {
fn update_interval(&self) -> Duration {
C::AVERAGE_BLOCK_INTERVAL * UPDATE_INTERVAL_IN_BLOCKS
}
async fn update(&self) {
let value = self
.client
.raw_storage_value(self.storage_key.clone(), None)
.await
.and_then(|maybe_storage_value| {
self.value_converter.decode(maybe_storage_value).map(|maybe_fixed_point_value| {
maybe_fixed_point_value.map(|fixed_point_value| {
fixed_point_value.into_inner().unique_saturated_into() as f64 /
V::Value::DIV.unique_saturated_into() as f64
})
})
})
.map_err(|e| e.to_string());
relay_utils::metrics::set_gauge_value(&self.metric, value.clone());
*self.shared_value_ref.write().await = value.ok().and_then(|x| x);
}
}
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Contains several Substrate-specific metrics that may be exposed by relay.
pub use float_storage_value::{FixedU128OrOne, FloatStorageValue, FloatStorageValueMetric};
mod float_storage_value;
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! The most generic Substrate node RPC interface.
use async_trait::async_trait;
use crate::{Chain, ChainWithGrandpa, TransactionStatusOf};
use jsonrpsee::{
core::{client::Subscription, RpcResult},
proc_macros::rpc,
ws_client::WsClient,
};
use pallet_transaction_payment_rpc_runtime_api::FeeDetails;
use sc_rpc_api::{state::ReadProof, system::Health};
use sp_core::{
storage::{StorageData, StorageKey},
Bytes,
};
use sp_rpc::number::NumberOrHex;
use sp_version::RuntimeVersion;
/// RPC methods of Substrate `system` namespace, that we are using.
#[rpc(client, client_bounds(C: Chain), namespace = "system")]
pub(crate) trait SubstrateSystem<C> {
/// Return node health.
#[method(name = "health")]
async fn health(&self) -> RpcResult<Health>;
/// Return system properties.
#[method(name = "properties")]
async fn properties(&self) -> RpcResult<sc_chain_spec::Properties>;
}
/// RPC methods of Substrate `chain` namespace, that we are using.
#[rpc(client, client_bounds(C: Chain), namespace = "chain")]
pub(crate) trait SubstrateChain<C> {
/// Get block hash by its number.
#[method(name = "getBlockHash")]
async fn block_hash(&self, block_number: Option<C::BlockNumber>) -> RpcResult<C::Hash>;
/// Return block header by its hash.
#[method(name = "getHeader")]
async fn header(&self, block_hash: Option<C::Hash>) -> RpcResult<C::Header>;
/// Return best finalized block hash.
#[method(name = "getFinalizedHead")]
async fn finalized_head(&self) -> RpcResult<C::Hash>;
/// Return signed block (with justifications) by its hash.
#[method(name = "getBlock")]
async fn block(&self, block_hash: Option<C::Hash>) -> RpcResult<C::SignedBlock>;
}
/// RPC methods of Substrate `author` namespace, that we are using.
#[rpc(client, client_bounds(C: Chain), namespace = "author")]
pub(crate) trait SubstrateAuthor<C> {
/// Submit extrinsic to the transaction pool.
#[method(name = "submitExtrinsic")]
async fn submit_extrinsic(&self, extrinsic: Bytes) -> RpcResult<C::Hash>;
/// Return vector of pending extrinsics from the transaction pool.
#[method(name = "pendingExtrinsics")]
async fn pending_extrinsics(&self) -> RpcResult<Vec<Bytes>>;
/// Submit and watch for extrinsic state.
#[subscription(name = "submitAndWatchExtrinsic", unsubscribe = "unwatchExtrinsic", item = TransactionStatusOf<C>)]
async fn submit_and_watch_extrinsic(&self, extrinsic: Bytes);
}
/// RPC methods of Substrate `state` namespace, that we are using.
#[rpc(client, client_bounds(C: Chain), namespace = "state")]
pub(crate) trait SubstrateState<C> {
/// Get current runtime version.
#[method(name = "getRuntimeVersion")]
async fn runtime_version(&self) -> RpcResult<RuntimeVersion>;
/// Call given runtime method.
#[method(name = "call")]
async fn call(
&self,
method: String,
data: Bytes,
at_block: Option<C::Hash>,
) -> RpcResult<Bytes>;
/// Get value of the runtime storage.
#[method(name = "getStorage")]
async fn storage(
&self,
key: StorageKey,
at_block: Option<C::Hash>,
) -> RpcResult<Option<StorageData>>;
/// Get proof of the runtime storage value.
#[method(name = "getReadProof")]
async fn prove_storage(
&self,
keys: Vec<StorageKey>,
hash: Option<C::Hash>,
) -> RpcResult<ReadProof<C::Hash>>;
}
/// RPC methods that we are using for a certain finality gadget.
#[async_trait]
pub trait SubstrateFinalityClient<C: Chain> {
/// Subscribe to finality justifications.
async fn subscribe_justifications(client: &WsClient) -> RpcResult<Subscription<Bytes>>;
}
/// RPC methods of Substrate `grandpa` namespace, that we are using.
#[rpc(client, client_bounds(C: ChainWithGrandpa), namespace = "grandpa")]
pub(crate) trait SubstrateGrandpa<C> {
/// Subscribe to GRANDPA justifications.
#[subscription(name = "subscribeJustifications", unsubscribe = "unsubscribeJustifications", item = Bytes)]
async fn subscribe_justifications(&self);
}
/// RPC finality methods of Substrate `grandpa` namespace, that we are using.
pub struct SubstrateGrandpaFinalityClient;
#[async_trait]
impl<C: ChainWithGrandpa> SubstrateFinalityClient<C> for SubstrateGrandpaFinalityClient {
async fn subscribe_justifications(client: &WsClient) -> RpcResult<Subscription<Bytes>> {
SubstrateGrandpaClient::<C>::subscribe_justifications(client).await
}
}
// TODO: Use `ChainWithBeefy` instead of `Chain` after #1606 is merged
/// RPC methods of Substrate `beefy` namespace, that we are using.
#[rpc(client, client_bounds(C: Chain), namespace = "beefy")]
pub(crate) trait SubstrateBeefy<C> {
/// Subscribe to BEEFY justifications.
#[subscription(name = "subscribeJustifications", unsubscribe = "unsubscribeJustifications", item = Bytes)]
async fn subscribe_justifications(&self);
}
/// RPC finality methods of Substrate `beefy` namespace, that we are using.
pub struct SubstrateBeefyFinalityClient;
// TODO: Use `ChainWithBeefy` instead of `Chain` after #1606 is merged
#[async_trait]
impl<C: Chain> SubstrateFinalityClient<C> for SubstrateBeefyFinalityClient {
async fn subscribe_justifications(client: &WsClient) -> RpcResult<Subscription<Bytes>> {
SubstrateBeefyClient::<C>::subscribe_justifications(client).await
}
}
/// RPC methods of Substrate `system` frame pallet, that we are using.
#[rpc(client, client_bounds(C: Chain), namespace = "system")]
pub(crate) trait SubstrateFrameSystem<C> {
/// Return index of next account transaction.
#[method(name = "accountNextIndex")]
async fn account_next_index(&self, account_id: C::AccountId) -> RpcResult<C::Nonce>;
}
/// RPC methods of Substrate `pallet_transaction_payment` frame pallet, that we are using.
#[rpc(client, client_bounds(C: Chain), namespace = "payment")]
pub(crate) trait SubstrateTransactionPayment<C> {
/// Query transaction fee details.
#[method(name = "queryFeeDetails")]
async fn fee_details(
&self,
extrinsic: Bytes,
at_block: Option<C::Hash>,
) -> RpcResult<FeeDetails<NumberOrHex>>;
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use bp_header_chain::ConsensusLogReader;
use finality_relay::SourceHeader as FinalitySourceHeader;
use sp_runtime::traits::Header as HeaderT;
/// Generic wrapper for `sp_runtime::traits::Header` based headers, that
/// implements `finality_relay::SourceHeader` and may be used in headers sync directly.
#[derive(Clone, Debug, PartialEq)]
pub struct SyncHeader<Header>(Header);
impl<Header> SyncHeader<Header> {
/// Extracts wrapped header from self.
pub fn into_inner(self) -> Header {
self.0
}
}
impl<Header> std::ops::Deref for SyncHeader<Header> {
type Target = Header;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<Header> From<Header> for SyncHeader<Header> {
fn from(header: Header) -> Self {
Self(header)
}
}
impl<Header: HeaderT, R: ConsensusLogReader> FinalitySourceHeader<Header::Hash, Header::Number, R>
for SyncHeader<Header>
{
fn hash(&self) -> Header::Hash {
self.0.hash()
}
fn number(&self) -> Header::Number {
*self.0.number()
}
fn is_mandatory(&self) -> bool {
R::schedules_authorities_change(self.digest())
}
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Pallet provides a set of guard functions that are running in background threads
//! and are aborting process if some condition fails.
//! Test chain implementation to use in tests.
#![cfg(any(feature = "test-helpers", test))]
use crate::{Chain, ChainWithBalances};
use bp_runtime::ChainId;
use frame_support::weights::Weight;
use std::time::Duration;
/// Chain that may be used in tests.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TestChain;
impl bp_runtime::Chain for TestChain {
const ID: ChainId = *b"test";
type BlockNumber = u32;
type Hash = sp_core::H256;
type Hasher = sp_runtime::traits::BlakeTwo256;
type Header = sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>;
type AccountId = u32;
type Balance = u32;
type Nonce = u32;
type Signature = sp_runtime::testing::TestSignature;
fn max_extrinsic_size() -> u32 {
unreachable!()
}
fn max_extrinsic_weight() -> Weight {
unreachable!()
}
}
impl Chain for TestChain {
const NAME: &'static str = "Test";
const BEST_FINALIZED_HEADER_ID_METHOD: &'static str = "TestMethod";
const FREE_HEADERS_INTERVAL_METHOD: &'static str = "TestMethod";
const AVERAGE_BLOCK_INTERVAL: Duration = Duration::from_millis(0);
type SignedBlock = sp_runtime::generic::SignedBlock<
sp_runtime::generic::Block<Self::Header, sp_runtime::OpaqueExtrinsic>,
>;
type Call = ();
}
impl ChainWithBalances for TestChain {
fn account_info_storage_key(_account_id: &u32) -> sp_core::storage::StorageKey {
unreachable!()
}
}
/// Primitives-level parachain that may be used in tests.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TestParachainBase;
impl bp_runtime::Chain for TestParachainBase {
const ID: ChainId = *b"tstp";
type BlockNumber = u32;
type Hash = sp_core::H256;
type Hasher = sp_runtime::traits::BlakeTwo256;
type Header = sp_runtime::generic::Header<u32, sp_runtime::traits::BlakeTwo256>;
type AccountId = u32;
type Balance = u32;
type Nonce = u32;
type Signature = sp_runtime::testing::TestSignature;
fn max_extrinsic_size() -> u32 {
unreachable!()
}
fn max_extrinsic_weight() -> Weight {
unreachable!()
}
}
impl bp_runtime::Parachain for TestParachainBase {
const PARACHAIN_ID: u32 = 1000;
const MAX_HEADER_SIZE: u32 = 1_024;
}
/// Parachain that may be used in tests.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TestParachain;
impl bp_runtime::UnderlyingChainProvider for TestParachain {
type Chain = TestParachainBase;
}
impl Chain for TestParachain {
const NAME: &'static str = "TestParachain";
const BEST_FINALIZED_HEADER_ID_METHOD: &'static str = "TestParachainMethod";
const FREE_HEADERS_INTERVAL_METHOD: &'static str = "TestParachainMethod";
const AVERAGE_BLOCK_INTERVAL: Duration = Duration::from_millis(0);
type SignedBlock = sp_runtime::generic::SignedBlock<
sp_runtime::generic::Block<Self::Header, sp_runtime::OpaqueExtrinsic>,
>;
type Call = ();
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Helper for tracking transaction invalidation events.
use crate::{Chain, Client, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf};
use async_trait::async_trait;
use futures::{future::Either, Future, FutureExt, Stream, StreamExt};
use relay_utils::{HeaderId, TrackedTransactionStatus};
use sp_runtime::traits::Header as _;
use std::time::Duration;
/// Transaction tracker environment.
#[async_trait]
pub trait Environment<C: Chain>: Send + Sync {
/// Returns header id by its hash.
async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error>;
}
#[async_trait]
impl<C: Chain> Environment<C> for Client<C> {
async fn header_id_by_hash(&self, hash: HashOf<C>) -> Result<HeaderIdOf<C>, Error> {
self.header_by_hash(hash).await.map(|h| HeaderId(*h.number(), hash))
}
}
/// Substrate transaction tracker implementation.
///
/// Substrate node provides RPC API to submit and watch for transaction events. This way
/// we may know when transaction is included into block, finalized or rejected. There are
/// some edge cases, when we can't fully trust this mechanism - e.g. transaction may broadcasted
/// and then dropped out of node transaction pool (some other cases are also possible - node
/// restarts, connection lost, ...). Then we can't know for sure - what is currently happening
/// with our transaction. Is the transaction really lost? Is it still alive on the chain network?
///
/// We have several options to handle such cases:
///
/// 1) hope that the transaction is still alive and wait for its mining until it is spoiled;
///
/// 2) assume that the transaction is lost and resubmit another transaction instantly;
///
/// 3) wait for some time (if transaction is mortal - then until block where it dies; if it is
/// immortal - then for some time that we assume is long enough to mine it) and assume that it is
/// lost.
///
/// This struct implements third option as it seems to be the most optimal.
pub struct TransactionTracker<C: Chain, E> {
environment: E,
transaction_hash: HashOf<C>,
stall_timeout: Duration,
subscription: Subscription<TransactionStatusOf<C>>,
}
impl<C: Chain, E: Environment<C>> TransactionTracker<C, E> {
/// Create transaction tracker.
pub fn new(
environment: E,
stall_timeout: Duration,
transaction_hash: HashOf<C>,
subscription: Subscription<TransactionStatusOf<C>>,
) -> Self {
Self { environment, stall_timeout, transaction_hash, subscription }
}
/// Wait for final transaction status and return it along with last known internal invalidation
/// status.
async fn do_wait(
self,
wait_for_stall_timeout: impl Future<Output = ()>,
wait_for_stall_timeout_rest: impl Future<Output = ()>,
) -> (TrackedTransactionStatus<HeaderIdOf<C>>, Option<InvalidationStatus<HeaderIdOf<C>>>) {
// sometimes we want to wait for the rest of the stall timeout even if
// `wait_for_invalidation` has been "select"ed first => it is shared
let wait_for_invalidation = watch_transaction_status::<_, C, _>(
self.environment,
self.transaction_hash,
self.subscription.into_stream(),
);
futures::pin_mut!(wait_for_stall_timeout, wait_for_invalidation);
match futures::future::select(wait_for_stall_timeout, wait_for_invalidation).await {
Either::Left((_, _)) => {
log::trace!(
target: "bridge",
"{} transaction {:?} is considered lost after timeout (no status response from the node)",
C::NAME,
self.transaction_hash,
);
(TrackedTransactionStatus::Lost, None)
},
Either::Right((invalidation_status, _)) => match invalidation_status {
InvalidationStatus::Finalized(at_block) =>
(TrackedTransactionStatus::Finalized(at_block), Some(invalidation_status)),
InvalidationStatus::Invalid =>
(TrackedTransactionStatus::Lost, Some(invalidation_status)),
InvalidationStatus::Lost => {
// wait for the rest of stall timeout - this way we'll be sure that the
// transaction is actually dead if it has been crafted properly
wait_for_stall_timeout_rest.await;
// if someone is still watching for our transaction, then we're reporting
// an error here (which is treated as "transaction lost")
log::trace!(
target: "bridge",
"{} transaction {:?} is considered lost after timeout",
C::NAME,
self.transaction_hash,
);
(TrackedTransactionStatus::Lost, Some(invalidation_status))
},
},
}
}
}
#[async_trait]
impl<C: Chain, E: Environment<C>> relay_utils::TransactionTracker for TransactionTracker<C, E> {
type HeaderId = HeaderIdOf<C>;
async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<C>> {
let wait_for_stall_timeout = async_std::task::sleep(self.stall_timeout).shared();
let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone();
self.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await.0
}
}
/// Transaction invalidation status.
///
/// Note that in places where the `TransactionTracker` is used, the finalization event will be
/// ignored - relay loops are detecting the mining/finalization using their own
/// techniques. That's why we're using `InvalidationStatus` here.
#[derive(Debug, PartialEq)]
enum InvalidationStatus<BlockId> {
/// Transaction has been included into block and finalized at given block.
Finalized(BlockId),
/// Transaction has been invalidated.
Invalid,
/// We have lost track of transaction status.
Lost,
}
/// Watch for transaction status until transaction is finalized or we lose track of its status.
async fn watch_transaction_status<
E: Environment<C>,
C: Chain,
S: Stream<Item = TransactionStatusOf<C>>,
>(
environment: E,
transaction_hash: HashOf<C>,
subscription: S,
) -> InvalidationStatus<HeaderIdOf<C>> {
futures::pin_mut!(subscription);
loop {
match subscription.next().await {
Some(TransactionStatusOf::<C>::Finalized((block_hash, _))) => {
// the only "successful" outcome of this method is when the block with transaction
// has been finalized
log::trace!(
target: "bridge",
"{} transaction {:?} has been finalized at block: {:?}",
C::NAME,
transaction_hash,
block_hash,
);
let header_id = match environment.header_id_by_hash(block_hash).await {
Ok(header_id) => header_id,
Err(e) => {
log::error!(
target: "bridge",
"Failed to read header {:?} when watching for {} transaction {:?}: {:?}",
block_hash,
C::NAME,
transaction_hash,
e,
);
// that's the best option we have here
return InvalidationStatus::Lost
},
};
return InvalidationStatus::Finalized(header_id)
},
Some(TransactionStatusOf::<C>::Invalid) => {
// if node says that the transaction is invalid, there are still chances that
// it is not actually invalid - e.g. if the block where transaction has been
// revalidated is retracted and transaction (at some other node pool) becomes
// valid again on other fork. But let's assume that the chances of this event
// are almost zero - there's a lot of things that must happen for this to be the
// case.
log::trace!(
target: "bridge",
"{} transaction {:?} has been invalidated",
C::NAME,
transaction_hash,
);
return InvalidationStatus::Invalid
},
Some(TransactionStatusOf::<C>::Future) |
Some(TransactionStatusOf::<C>::Ready) |
Some(TransactionStatusOf::<C>::Broadcast(_)) => {
// nothing important (for us) has happened
},
Some(TransactionStatusOf::<C>::InBlock(block_hash)) => {
// TODO: read matching system event (ExtrinsicSuccess or ExtrinsicFailed), log it
// here and use it later (on finality) for reporting invalid transaction
// https://github.com/paritytech/parity-bridges-common/issues/1464
log::trace!(
target: "bridge",
"{} transaction {:?} has been included in block: {:?}",
C::NAME,
transaction_hash,
block_hash,
);
},
Some(TransactionStatusOf::<C>::Retracted(block_hash)) => {
log::trace!(
target: "bridge",
"{} transaction {:?} at block {:?} has been retracted",
C::NAME,
transaction_hash,
block_hash,
);
},
Some(TransactionStatusOf::<C>::FinalityTimeout(block_hash)) => {
// finality is lagging? let's wait a bit more and report a stall
log::trace!(
target: "bridge",
"{} transaction {:?} block {:?} has not been finalized for too long",
C::NAME,
transaction_hash,
block_hash,
);
return InvalidationStatus::Lost
},
Some(TransactionStatusOf::<C>::Usurped(new_transaction_hash)) => {
// this may be result of our transaction resubmitter work or some manual
// intervention. In both cases - let's start stall timeout, because the meaning
// of transaction may have changed
log::trace!(
target: "bridge",
"{} transaction {:?} has been usurped by new transaction: {:?}",
C::NAME,
transaction_hash,
new_transaction_hash,
);
return InvalidationStatus::Lost
},
Some(TransactionStatusOf::<C>::Dropped) => {
// the transaction has been removed from the pool because of its limits. Let's wait
// a bit and report a stall
log::trace!(
target: "bridge",
"{} transaction {:?} has been dropped from the pool",
C::NAME,
transaction_hash,
);
return InvalidationStatus::Lost
},
None => {
// the status of transaction is unknown to us (the subscription has been closed?).
// Let's wait a bit and report a stall
return InvalidationStatus::Lost
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_chain::TestChain;
use futures::{FutureExt, SinkExt};
use sc_transaction_pool_api::TransactionStatus;
struct TestEnvironment(Result<HeaderIdOf<TestChain>, Error>);
#[async_trait]
impl Environment<TestChain> for TestEnvironment {
async fn header_id_by_hash(
&self,
_hash: HashOf<TestChain>,
) -> Result<HeaderIdOf<TestChain>, Error> {
self.0.as_ref().map_err(|_| Error::BridgePalletIsNotInitialized).cloned()
}
}
async fn on_transaction_status(
status: TransactionStatus<HashOf<TestChain>, HashOf<TestChain>>,
) -> Option<(
TrackedTransactionStatus<HeaderIdOf<TestChain>>,
InvalidationStatus<HeaderIdOf<TestChain>>,
)> {
let (mut sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription(async_std::sync::Mutex::new(receiver)),
);
let wait_for_stall_timeout = futures::future::pending();
let wait_for_stall_timeout_rest = futures::future::ready(());
sender.send(Some(status)).await.unwrap();
tx_tracker
.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest)
.now_or_never()
.map(|(ts, is)| (ts, is.unwrap()))
}
#[async_std::test]
async fn returns_finalized_on_finalized() {
assert_eq!(
on_transaction_status(TransactionStatus::Finalized(Default::default())).await,
Some((
TrackedTransactionStatus::Finalized(Default::default()),
InvalidationStatus::Finalized(Default::default())
)),
);
}
#[async_std::test]
async fn returns_lost_on_finalized_and_environment_error() {
assert_eq!(
watch_transaction_status::<_, TestChain, _>(
TestEnvironment(Err(Error::BridgePalletIsNotInitialized)),
Default::default(),
futures::stream::iter([TransactionStatus::Finalized(Default::default())])
)
.now_or_never(),
Some(InvalidationStatus::Lost),
);
}
#[async_std::test]
async fn returns_invalid_on_invalid() {
assert_eq!(
on_transaction_status(TransactionStatus::Invalid).await,
Some((TrackedTransactionStatus::Lost, InvalidationStatus::Invalid)),
);
}
#[async_std::test]
async fn waits_on_future() {
assert_eq!(on_transaction_status(TransactionStatus::Future).await, None,);
}
#[async_std::test]
async fn waits_on_ready() {
assert_eq!(on_transaction_status(TransactionStatus::Ready).await, None,);
}
#[async_std::test]
async fn waits_on_broadcast() {
assert_eq!(
on_transaction_status(TransactionStatus::Broadcast(Default::default())).await,
None,
);
}
#[async_std::test]
async fn waits_on_in_block() {
assert_eq!(
on_transaction_status(TransactionStatus::InBlock(Default::default())).await,
None,
);
}
#[async_std::test]
async fn waits_on_retracted() {
assert_eq!(
on_transaction_status(TransactionStatus::Retracted(Default::default())).await,
None,
);
}
#[async_std::test]
async fn lost_on_finality_timeout() {
assert_eq!(
on_transaction_status(TransactionStatus::FinalityTimeout(Default::default())).await,
Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
);
}
#[async_std::test]
async fn lost_on_usurped() {
assert_eq!(
on_transaction_status(TransactionStatus::Usurped(Default::default())).await,
Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
);
}
#[async_std::test]
async fn lost_on_dropped() {
assert_eq!(
on_transaction_status(TransactionStatus::Dropped).await,
Some((TrackedTransactionStatus::Lost, InvalidationStatus::Lost)),
);
}
#[async_std::test]
async fn lost_on_subscription_error() {
assert_eq!(
watch_transaction_status::<_, TestChain, _>(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Default::default(),
futures::stream::iter([])
)
.now_or_never(),
Some(InvalidationStatus::Lost),
);
}
#[async_std::test]
async fn lost_on_timeout_when_waiting_for_invalidation_status() {
let (_sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription(async_std::sync::Mutex::new(receiver)),
);
let wait_for_stall_timeout = futures::future::ready(()).shared();
let wait_for_stall_timeout_rest = wait_for_stall_timeout.clone();
let wait_result = tx_tracker
.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest)
.now_or_never();
assert_eq!(wait_result, Some((TrackedTransactionStatus::Lost, None)));
}
}
[package]
name = "equivocation-detector"
version = "0.1.0"
authors = ["Parity Technologies <[email protected]>"]
edition = "2021"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
description = "Equivocation detector"
[lints]
workspace = true
[dependencies]
async-std = { version = "1.6.5", features = ["attributes"] }
async-trait = "0.1"
bp-header-chain = { path = "../../primitives/header-chain" }
finality-relay = { path = "../finality" }
frame-support = { git = "https://github.com/paritytech/polkadot-sdk", branch = "master" }
futures = "0.3.30"
log = { workspace = true }
num-traits = "0.2"
relay-utils = { path = "../utils" }