Skip to content
parachains_loop.rs 30.4 KiB
Newer Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.

// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.

use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline};

use async_trait::async_trait;
use bp_parachains::BestParaHeadHash;
use bp_polkadot_core::{
	parachains::{ParaHash, ParaHeadsProof, ParaId},
	BlockNumber as RelayBlockNumber,
};
use futures::{future::FutureExt, select};
use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf};
use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient};
use std::{
	collections::{BTreeMap, BTreeSet},
	future::Future,
	time::{Duration, Instant},
};

/// Parachain heads synchronization params.
#[derive(Clone, Debug)]
pub struct ParachainSyncParams {
	/// Parachains that we're relaying here.
	pub parachains: Vec<ParaId>,
	/// Parachain heads update strategy.
	pub strategy: ParachainSyncStrategy,
	/// Stall timeout. If we have submitted transaction and we see no state updates for this
	/// period, we consider our transaction lost.
	pub stall_timeout: Duration,
}

/// Parachain heads update strategy.
#[derive(Clone, Copy, Debug)]
pub enum ParachainSyncStrategy {
	/// Update whenever any parachain head is updated.
	Any,
	/// Wait till all parachain heads are updated.
	All,
}

/// Parachain head hash, available at the source (relay) chain.
#[derive(Clone, Copy, Debug)]
pub enum ParaHashAtSource {
	/// There's no parachain head at the source chain.
	///
	/// Normally it means that the parachain is not registered there.
	None,
	/// Parachain head with given hash is available at the source chain.
	Some(ParaHash),
	/// The source client refuses to report parachain head hash at this moment.
	///
	/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
	/// This variant must be treated as "we don't want to update parachain head value at the
	/// target chain at this moment".
	Unavailable,
}

/// Source client used in parachain heads synchronization loop.
#[async_trait]
pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
	/// Returns `Ok(true)` if client is in synced state.
	async fn ensure_synced(&self) -> Result<bool, Self::Error>;

	/// Get parachain head hash at given block.
	///
	/// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_source`
	/// on provided `metrics` object to update corresponding metric value.
	async fn parachain_head(
		&self,
		at_block: HeaderIdOf<P::SourceChain>,
		metrics: Option<&ParachainsLoopMetrics>,
		para_id: ParaId,
	) -> Result<ParaHashAtSource, Self::Error>;

	/// Get parachain heads proof.
	async fn prove_parachain_heads(
		&self,
		at_block: HeaderIdOf<P::SourceChain>,
		parachains: &[ParaId],
	) -> Result<ParaHeadsProof, Self::Error>;
}

/// Target client used in parachain heads synchronization loop.
#[async_trait]
pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
	/// Get best block id.
	async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;

	/// Get best finalized source block id.
	async fn best_finalized_source_block(
		&self,
		at_block: &HeaderIdOf<P::TargetChain>,
	) -> Result<HeaderIdOf<P::SourceChain>, Self::Error>;

	/// Get parachain head hash at given block.
	///
	/// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_target`
	/// on provided `metrics` object to update corresponding metric value.
	async fn parachain_head(
		&self,
		at_block: HeaderIdOf<P::TargetChain>,
		metrics: Option<&ParachainsLoopMetrics>,
		para_id: ParaId,
	) -> Result<Option<BestParaHeadHash>, Self::Error>;

	/// Submit parachain heads proof.
	async fn submit_parachain_heads_proof(
		&self,
		at_source_block: HeaderIdOf<P::SourceChain>,
		updated_parachains: Vec<ParaId>,
		proof: ParaHeadsProof,
	) -> Result<(), Self::Error>;
}

/// Return prefix that will be used by default to expose Prometheus metrics of the parachains
/// sync loop.
pub fn metrics_prefix<P: ParachainsPipeline>() -> String {
	format!("{}_to_{}_Parachains", P::SourceChain::NAME, P::TargetChain::NAME)
}

/// Run parachain heads synchronization.
pub async fn run<P: ParachainsPipeline>(
	source_client: impl SourceClient<P>,
	target_client: impl TargetClient<P>,
	sync_params: ParachainSyncParams,
	metrics_params: MetricsParams,
	exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), relay_utils::Error>
where
	P::SourceChain: Chain<BlockNumber = RelayBlockNumber>,
{
	let exit_signal = exit_signal.shared();
	relay_utils::relay_loop(source_client, target_client)
		.with_metrics(metrics_params)
		.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
		.expose()
		.await?
		.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
			run_until_connection_lost(
				source_client,
				target_client,
				sync_params.clone(),
				metrics,
				exit_signal.clone(),
			)
		})
		.await
}

/// Run parachain heads synchronization.
async fn run_until_connection_lost<P: ParachainsPipeline>(
	source_client: impl SourceClient<P>,
	target_client: impl TargetClient<P>,
	sync_params: ParachainSyncParams,
	metrics: Option<ParachainsLoopMetrics>,
	exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient>
where
	P::SourceChain: Chain<BlockNumber = RelayBlockNumber>,
{
	let exit_signal = exit_signal.fuse();
	let min_block_interval = std::cmp::min(
		P::SourceChain::AVERAGE_BLOCK_INTERVAL,
		P::TargetChain::AVERAGE_BLOCK_INTERVAL,
	);

	let mut tx_tracker: Option<TransactionTracker<P>> = None;

	futures::pin_mut!(exit_signal);

	// Note that the internal loop breaks with `FailedClient` error even if error is non-connection.
	// It is Ok for now, but it may need to be fixed in the future to use exponential backoff for
	// regular errors.

	loop {
		// either wait for new block, or exit signal
		select! {
			_ = async_std::task::sleep(min_block_interval).fuse() => {},
			_ = exit_signal => return Ok(()),
		}

		// if source client is not yet synced, we'll need to sleep. Otherwise we risk submitting too
		// much redundant transactions
		match source_client.ensure_synced().await {
			Ok(true) => (),
			Ok(false) => {
Loading full blame...