Skip to content
relay_loop.rs 7.35 KiB
Newer Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.

// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.

hacpy's avatar
hacpy committed
use crate::{
	metrics::{Metric, MetricsAddress, MetricsParams},
hacpy's avatar
hacpy committed
	FailedClient, MaybeConnectionError,
};

use async_trait::async_trait;
use std::{fmt::Debug, future::Future, net::SocketAddr, time::Duration};
use substrate_prometheus_endpoint::{init_prometheus, Registry};

/// Default pause between reconnect attempts.
pub const RECONNECT_DELAY: Duration = Duration::from_secs(10);

/// Basic blockchain client from relay perspective.
#[async_trait]
pub trait Client: 'static + Clone + Send + Sync {
	/// Type of error these clients returns.
	type Error: 'static + Debug + MaybeConnectionError + Send + Sync;

	/// Try to reconnect to source node.
	async fn reconnect(&mut self) -> Result<(), Self::Error>;

	/// Try to reconnect to the source node in an infinite loop until it succeeds.
	async fn reconnect_until_success(&mut self, delay: Duration) {
		loop {
			match self.reconnect().await {
				Ok(()) => break,
				Err(error) => {
					log::warn!(
						target: "bridge",
						"Failed to reconnect to client. Going to retry in {}s: {:?}",
						delay.as_secs(),
						error,
					);

					async_std::task::sleep(delay).await;
				},
			}
		}
	}
#[async_trait]
impl Client for () {
	type Error = crate::StringifiedMaybeConnectionError;

	async fn reconnect(&mut self) -> Result<(), Self::Error> {
		Ok(())
	}
}

/// Returns generic loop that may be customized and started.
pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC, ()> {
hacpy's avatar
hacpy committed
	Loop { reconnect_delay: RECONNECT_DELAY, source_client, target_client, loop_metric: None }
hacpy's avatar
hacpy committed
/// Returns generic relay loop metrics that may be customized and used in one or several relay
/// loops.
pub fn relay_metrics(params: MetricsParams) -> LoopMetrics<(), (), ()> {
	LoopMetrics {
		relay_loop: Loop {
			reconnect_delay: RECONNECT_DELAY,
			source_client: (),
			target_client: (),
			loop_metric: None,
		},
		registry: params.registry,
/// Generic relay loop.
pub struct Loop<SC, TC, LM> {
	source_client: SC,
	target_client: TC,
	loop_metric: Option<LM>,
}

/// Relay loop metrics builder.
pub struct LoopMetrics<SC, TC, LM> {
	relay_loop: Loop<SC, TC, ()>,
	registry: Registry,
	loop_metric: Option<LM>,
}

impl<SC, TC, LM> Loop<SC, TC, LM> {
	/// Customize delay between reconnect attempts.
	pub fn reconnect_delay(mut self, reconnect_delay: Duration) -> Self {
		self.reconnect_delay = reconnect_delay;
		self
	}

	/// Start building loop metrics using given prefix.
	pub fn with_metrics(self, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
		LoopMetrics {
			relay_loop: Loop {
				source_client: self.source_client,
				target_client: self.target_client,
				loop_metric: None,
			},
			registry: params.registry,
			loop_metric: None,
		}
	}

	/// Run relay loop.
	///
hacpy's avatar
hacpy committed
	/// This function represents an outer loop, which in turn calls provided `run_loop` function to
	/// do actual job. When `run_loop` returns, this outer loop reconnects to failed client (source,
	/// target or both) and calls `run_loop` again.
	pub async fn run<R, F>(mut self, loop_name: String, run_loop: R) -> Result<(), Error>
		R: 'static + Send + Fn(SC, TC, Option<LM>) -> F,
		F: 'static + Send + Future<Output = Result<(), FailedClient>>,
		SC: 'static + Client,
		TC: 'static + Client,
		LM: 'static + Send + Clone,
		let run_loop_task = async move {
			crate::initialize::initialize_loop(loop_name);

			loop {
				let loop_metric = self.loop_metric.clone();
hacpy's avatar
hacpy committed
				let future_result =
					run_loop(self.source_client.clone(), self.target_client.clone(), loop_metric);
				let result = future_result.await;

				match result {
					Ok(()) => break,
					Err(failed_client) => {
						log::debug!(target: "bridge", "Restarting relay loop");

						reconnect_failed_client(
							failed_client,
							self.reconnect_delay,
							&mut self.source_client,
							&mut self.target_client,
						)
		async_std::task::spawn(run_loop_task).await
	}
}

impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
	/// Add relay loop metrics.
	///
	/// Loop metrics will be passed to the loop callback.
	pub fn loop_metric<NewLM: Metric>(
		metric: NewLM,
	) -> Result<LoopMetrics<SC, TC, NewLM>, Error> {
		metric.register(&self.registry)?;

		Ok(LoopMetrics {
			relay_loop: self.relay_loop,
			registry: self.registry,
			loop_metric: Some(metric),
	/// Convert into `MetricsParams` structure so that metrics registry may be extended later.
	pub fn into_params(self) -> MetricsParams {
		MetricsParams { address: self.address, registry: self.registry }
	}

	/// Expose metrics using address passed at creation.
	/// If passed `address` is `None`, metrics are not exposed.
	pub async fn expose(self) -> Result<Loop<SC, TC, LM>, Error> {
			let socket_addr = SocketAddr::new(
				address
					.host
					.parse()
					.map_err(|err| Error::ExposingMetricsInvalidHost(address.host.clone(), err))?,
			);

			let registry = self.registry;
			async_std::task::spawn(async move {
				let runtime =
					match tokio::runtime::Builder::new_current_thread().enable_all().build() {
						Ok(runtime) => runtime,
						Err(err) => {
							log::trace!(
								target: "bridge-metrics",
								"Failed to create tokio runtime. Prometheus metrics are not available: {:?}",
					log::trace!(
						target: "bridge-metrics",
						"Starting prometheus endpoint at: {:?}",
						socket_addr,
					);
					let result = init_prometheus(socket_addr, registry).await;
					log::trace!(
						target: "bridge-metrics",
						"Prometheus endpoint has exited with result: {:?}",
						result,
					);
				});
			reconnect_delay: self.relay_loop.reconnect_delay,
			source_client: self.relay_loop.source_client,
			target_client: self.relay_loop.target_client,
			loop_metric: self.loop_metric,
		})
/// Deal with the clients that have returned connection error.
pub async fn reconnect_failed_client(
	failed_client: FailedClient,
	reconnect_delay: Duration,
	source_client: &mut impl Client,
	target_client: &mut impl Client,
) {
	if failed_client == FailedClient::Source || failed_client == FailedClient::Both {
		source_client.reconnect_until_success(reconnect_delay).await;
	}
	if failed_client == FailedClient::Target || failed_client == FailedClient::Both {
		target_client.reconnect_until_success(reconnect_delay).await;