Skip to content
Snippets Groups Projects
Commit 940d7e46 authored by Svyatoslav Nikolsky's avatar Svyatoslav Nikolsky Committed by Bastian Köcher
Browse files

remove abandoned exchange relay (#1217)

parent 2a6b065a
Branches
No related merge requests found
[package]
name = "exchange-relay"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
anyhow = "1.0"
async-std = "1.6.5"
async-trait = "0.1.40"
backoff = "0.2"
futures = "0.3.5"
log = "0.4.11"
num-traits = "0.2"
parking_lot = "0.11.0"
relay-utils = { path = "../utils" }
thiserror = "1.0.26"
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Exchange-relay errors.
use crate::exchange::{BlockHashOf, BlockNumberOf, TransactionHashOf};
use relay_utils::MaybeConnectionError;
use std::fmt::{Debug, Display};
use thiserror::Error;
/// Error type given pipeline.
pub type ErrorOf<P> = Error<BlockHashOf<P>, BlockNumberOf<P>, TransactionHashOf<P>>;
/// Exchange-relay error type.
#[derive(Error, Debug)]
pub enum Error<Hash: Display, HeaderNumber: Display, SourceTxHash: Display> {
/// Failed to check finality of the requested header on the target node.
#[error("Failed to check finality of header {0}/{1} on {2} node: {3:?}")]
Finality(HeaderNumber, Hash, &'static str, anyhow::Error),
/// Error retrieving block from the source node.
#[error("Error retrieving block {0} from {1} node: {2:?}")]
RetrievingBlock(Hash, &'static str, anyhow::Error),
/// Error retrieving transaction from the source node.
#[error("Error retrieving transaction {0} from {1} node: {2:?}")]
RetrievingTransaction(SourceTxHash, &'static str, anyhow::Error),
/// Failed to check existence of header from the target node.
#[error("Failed to check existence of header {0}/{1} on {2} node: {3:?}")]
CheckHeaderExistence(HeaderNumber, Hash, &'static str, anyhow::Error),
/// Failed to prepare proof for the transaction from the source node.
#[error("Error building transaction {0} proof on {1} node: {2:?}")]
BuildTransactionProof(String, &'static str, anyhow::Error, bool),
/// Failed to submit the transaction proof to the target node.
#[error("Error submitting transaction {0} proof to {1} node: {2:?}")]
SubmitTransactionProof(String, &'static str, anyhow::Error, bool),
/// Transaction filtering failed.
#[error("Transaction filtering has failed with {0:?}")]
TransactionFiltering(anyhow::Error, bool),
/// Utilities/metrics error.
#[error("{0}")]
Utils(#[from] relay_utils::Error),
}
impl<T: Display, U: Display, V: Display> MaybeConnectionError for Error<T, U, V> {
fn is_connection_error(&self) -> bool {
match *self {
Self::BuildTransactionProof(_, _, _, b) => b,
Self::SubmitTransactionProof(_, _, _, b) => b,
Self::TransactionFiltering(_, b) => b,
_ => false,
}
}
}
This diff is collapsed.
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Relaying proofs of exchange transactions.
use crate::{
error::Error,
exchange::{
relay_block_transactions, BlockNumberOf, RelayedBlockTransactions, SourceClient,
TargetClient, TransactionProofPipeline,
},
exchange_loop_metrics::ExchangeLoopMetrics,
};
use crate::error::ErrorOf;
use backoff::backoff::Backoff;
use futures::{future::FutureExt, select};
use num_traits::One;
use relay_utils::{
metrics::{GlobalMetrics, MetricsParams},
retry_backoff, FailedClient, MaybeConnectionError,
};
use std::future::Future;
/// Transactions proofs relay state.
#[derive(Debug)]
pub struct TransactionProofsRelayState<BlockNumber> {
/// Number of last header we have processed so far.
pub best_processed_header_number: BlockNumber,
}
/// Transactions proofs relay storage.
pub trait TransactionProofsRelayStorage: 'static + Clone + Send + Sync {
/// Associated block number.
type BlockNumber: 'static + Send + Sync;
/// Get relay state.
fn state(&self) -> TransactionProofsRelayState<Self::BlockNumber>;
/// Update relay state.
fn set_state(&mut self, state: &TransactionProofsRelayState<Self::BlockNumber>);
}
/// In-memory storage for auto-relay loop.
#[derive(Debug, Clone)]
pub struct InMemoryStorage<BlockNumber> {
best_processed_header_number: BlockNumber,
}
impl<BlockNumber> InMemoryStorage<BlockNumber> {
/// Created new in-memory storage with given best processed block number.
pub fn new(best_processed_header_number: BlockNumber) -> Self {
InMemoryStorage { best_processed_header_number }
}
}
impl<BlockNumber: 'static + Clone + Copy + Send + Sync> TransactionProofsRelayStorage
for InMemoryStorage<BlockNumber>
{
type BlockNumber = BlockNumber;
fn state(&self) -> TransactionProofsRelayState<BlockNumber> {
TransactionProofsRelayState {
best_processed_header_number: self.best_processed_header_number,
}
}
fn set_state(&mut self, state: &TransactionProofsRelayState<BlockNumber>) {
self.best_processed_header_number = state.best_processed_header_number;
}
}
/// Return prefix that will be used by default to expose Prometheus metrics of the exchange loop.
pub fn metrics_prefix<P: TransactionProofPipeline>() -> String {
format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME)
}
/// Run proofs synchronization.
pub async fn run<P: TransactionProofPipeline>(
storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), ErrorOf<P>> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
.loop_metric(ExchangeLoopMetrics::new)?
.standalone_metric(GlobalMetrics::new)?
.expose()
.await?
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost(
storage.clone(),
source_client,
target_client,
metrics,
exit_signal.clone(),
)
})
.await
.map_err(Error::Utils)
}
/// Run proofs synchronization.
async fn run_until_connection_lost<P: TransactionProofPipeline>(
mut storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_exch: Option<ExchangeLoopMetrics>,
exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient> {
let mut retry_backoff = retry_backoff();
let mut state = storage.state();
let mut current_finalized_block = None;
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal);
loop {
let iteration_result = run_loop_iteration(
&mut storage,
&source_client,
&target_client,
&mut state,
&mut current_finalized_block,
metrics_exch.as_ref(),
)
.await;
if let Err((is_connection_error, failed_client)) = iteration_result {
if is_connection_error {
return Err(failed_client)
}
let retry_timeout =
retry_backoff.next_backoff().unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY);
select! {
_ = async_std::task::sleep(retry_timeout).fuse() => {},
_ = exit_signal => return Ok(()),
}
} else {
retry_backoff.reset();
select! {
_ = source_client.tick().fuse() => {},
_ = exit_signal => return Ok(()),
}
}
}
}
/// Run exchange loop until we need to break.
async fn run_loop_iteration<P: TransactionProofPipeline>(
storage: &mut impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: &impl SourceClient<P>,
target_client: &impl TargetClient<P>,
state: &mut TransactionProofsRelayState<BlockNumberOf<P>>,
current_finalized_block: &mut Option<(P::Block, RelayedBlockTransactions)>,
exchange_loop_metrics: Option<&ExchangeLoopMetrics>,
) -> Result<(), (bool, FailedClient)> {
let best_finalized_header_id = match target_client.best_finalized_header_id().await {
Ok(best_finalized_header_id) => {
log::debug!(
target: "bridge",
"Got best finalized {} block from {} node: {:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
best_finalized_header_id,
);
best_finalized_header_id
},
Err(err) => {
log::error!(
target: "bridge",
"Failed to retrieve best {} header id from {} node: {:?}. Going to retry...",
P::SOURCE_NAME,
P::TARGET_NAME,
err,
);
return Err((err.is_connection_error(), FailedClient::Target))
},
};
loop {
// if we already have some finalized block body, try to relay its transactions
if let Some((block, relayed_transactions)) = current_finalized_block.take() {
let result = relay_block_transactions(
source_client,
target_client,
&block,
relayed_transactions,
)
.await;
match result {
Ok(relayed_transactions) => {
log::info!(
target: "bridge",
"Relay has processed {} block #{}. Total/Relayed/Failed transactions: {}/{}/{}",
P::SOURCE_NAME,
state.best_processed_header_number,
relayed_transactions.processed,
relayed_transactions.relayed,
relayed_transactions.failed,
);
state.best_processed_header_number =
state.best_processed_header_number + One::one();
storage.set_state(state);
if let Some(exchange_loop_metrics) = exchange_loop_metrics {
exchange_loop_metrics.update::<P>(
state.best_processed_header_number,
best_finalized_header_id.0,
relayed_transactions,
);
}
// we have just updated state => proceed to next block retrieval
},
Err((failed_client, relayed_transactions)) => {
*current_finalized_block = Some((block, relayed_transactions));
return Err((true, failed_client))
},
}
}
// we may need to retrieve finalized block body from source node
if best_finalized_header_id.0 > state.best_processed_header_number {
let next_block_number = state.best_processed_header_number + One::one();
let result = source_client.block_by_number(next_block_number).await;
match result {
Ok(block) => {
*current_finalized_block = Some((block, RelayedBlockTransactions::default()));
// we have received new finalized block => go back to relay its transactions
continue
},
Err(err) => {
log::error!(
target: "bridge",
"Failed to retrieve canonical block #{} from {} node: {:?}. Going to retry...",
next_block_number,
P::SOURCE_NAME,
err,
);
return Err((err.is_connection_error(), FailedClient::Source))
},
}
}
// there are no any transactions we need to relay => wait for new data
return Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::exchange::tests::{
test_next_block, test_next_block_id, test_transaction_hash, TestTransactionProof,
TestTransactionsSource, TestTransactionsTarget,
};
use futures::{future::FutureExt, stream::StreamExt};
#[test]
fn exchange_loop_is_able_to_relay_proofs() {
let storage = InMemoryStorage { best_processed_header_number: 0 };
let target =
TestTransactionsTarget::new(Box::new(|_| unreachable!("no target ticks allowed")));
let target_data = target.data.clone();
let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded();
let source = TestTransactionsSource::new(Box::new(move |data| {
let transaction1_relayed = target_data
.lock()
.submitted_proofs
.contains(&TestTransactionProof(test_transaction_hash(0)));
let transaction2_relayed = target_data
.lock()
.submitted_proofs
.contains(&TestTransactionProof(test_transaction_hash(1)));
match (transaction1_relayed, transaction2_relayed) {
(true, true) => exit_sender.unbounded_send(()).unwrap(),
(true, false) => {
data.block = Ok(test_next_block());
target_data.lock().best_finalized_header_id = Ok(test_next_block_id());
target_data.lock().transactions_to_accept.insert(test_transaction_hash(1));
},
_ => (),
}
}));
let _ = async_std::task::block_on(run(
storage,
source,
target,
MetricsParams::disabled(),
exit_receiver.into_future().map(|(_, _)| ()),
));
}
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Metrics for currency-exchange relay loop.
use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofPipeline};
use relay_utils::metrics::{
metric_name, register, Counter, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64,
};
/// Exchange transactions relay metrics.
#[derive(Clone)]
pub struct ExchangeLoopMetrics {
/// Best finalized block numbers - "processed" and "known".
best_block_numbers: GaugeVec<U64>,
/// Number of processed blocks ("total").
processed_blocks: Counter<U64>,
/// Number of processed transactions ("total", "relayed" and "failed").
processed_transactions: CounterVec<U64>,
}
impl ExchangeLoopMetrics {
/// Create and register exchange loop metrics.
pub fn new(registry: &Registry, prefix: Option<&str>) -> Result<Self, PrometheusError> {
Ok(ExchangeLoopMetrics {
best_block_numbers: register(
GaugeVec::new(
Opts::new(
metric_name(prefix, "best_block_numbers"),
"Best finalized block numbers",
),
&["type"],
)?,
registry,
)?,
processed_blocks: register(
Counter::new(
metric_name(prefix, "processed_blocks"),
"Total number of processed blocks",
)?,
registry,
)?,
processed_transactions: register(
CounterVec::new(
Opts::new(
metric_name(prefix, "processed_transactions"),
"Total number of processed transactions",
),
&["type"],
)?,
registry,
)?,
})
}
}
impl ExchangeLoopMetrics {
/// Update metrics when single block is relayed.
pub fn update<P: TransactionProofPipeline>(
&self,
best_processed_block_number: BlockNumberOf<P>,
best_known_block_number: BlockNumberOf<P>,
relayed_transactions: RelayedBlockTransactions,
) {
self.best_block_numbers
.with_label_values(&["processed"])
.set(best_processed_block_number.into());
self.best_block_numbers
.with_label_values(&["known"])
.set(best_known_block_number.into());
self.processed_blocks.inc();
self.processed_transactions
.with_label_values(&["total"])
.inc_by(relayed_transactions.processed as _);
self.processed_transactions
.with_label_values(&["relayed"])
.inc_by(relayed_transactions.relayed as _);
self.processed_transactions
.with_label_values(&["failed"])
.inc_by(relayed_transactions.failed as _);
}
}
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Relaying [`currency-exchange`](../pallet_bridge_currency_exchange/index.html) application
//! specific data. Currency exchange application allows exchanging tokens between bridged chains.
//! This module provides entrypoints for crafting and submitting (single and multiple)
//! proof-of-exchange-at-source-chain transaction(s) to target chain.
#![warn(missing_docs)]
pub mod error;
pub mod exchange;
pub mod exchange_loop;
pub mod exchange_loop_metrics;
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment