Skip to content
parachains_loop.rs 36 KiB
Newer Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.

// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.

use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline};

use async_trait::async_trait;
use bp_parachains::BestParaHeadHash;
use bp_polkadot_core::{
	parachains::{ParaHash, ParaHeadsProof, ParaId},
	BlockNumber as RelayBlockNumber,
};
use futures::{
	future::{FutureExt, Shared},
	poll, select_biased,
use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf};
use relay_utils::{
	metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient,
	TrackedTransactionStatus, TransactionTracker,
};
use std::{
	collections::{BTreeMap, BTreeSet},
	future::Future,
	pin::Pin,
	task::Poll,
	time::Duration,
};

/// Parachain heads synchronization params.
#[derive(Clone, Debug)]
pub struct ParachainSyncParams {
	/// Parachains that we're relaying here.
	pub parachains: Vec<ParaId>,
	/// Parachain heads update strategy.
	pub strategy: ParachainSyncStrategy,
	/// Stall timeout. If we have submitted transaction and we see no state updates for this
	/// period, we consider our transaction lost.
	pub stall_timeout: Duration,
}

/// Parachain heads update strategy.
#[derive(Clone, Copy, Debug)]
pub enum ParachainSyncStrategy {
	/// Update whenever any parachain head is updated.
	Any,
	/// Wait till all parachain heads are updated.
	All,
}

Serban Iorga's avatar
Serban Iorga committed
/// Parachain header availability at a certain chain.
Serban Iorga's avatar
Serban Iorga committed
pub enum AvailableHeader<T> {
	/// The client refuses to report parachain head at this moment.
	///
	/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
	/// This variant must be treated as "we don't want to update parachain head value at the
	/// target chain at this moment".
	Unavailable,
Serban Iorga's avatar
Serban Iorga committed
	/// There's no parachain header at the relay chain.
	///
	/// Normally it means that the parachain is not registered there.
	Missing,
	/// Parachain head with given hash is available at the source chain.
	Available(T),
Serban Iorga's avatar
Serban Iorga committed
impl<T> AvailableHeader<T> {
	/// Transform contained value.
	pub fn map<F, U>(self, f: F) -> AvailableHeader<U>
	where
		F: FnOnce(T) -> U,
	{
		match self {
			AvailableHeader::Unavailable => AvailableHeader::Unavailable,
			AvailableHeader::Missing => AvailableHeader::Missing,
			AvailableHeader::Available(val) => AvailableHeader::Available(f(val)),
/// Source client used in parachain heads synchronization loop.
#[async_trait]
pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
	/// Returns `Ok(true)` if client is in synced state.
	async fn ensure_synced(&self) -> Result<bool, Self::Error>;

	/// Get parachain head hash at given block.
	///
	/// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_source`
	/// on provided `metrics` object to update corresponding metric value.
	async fn parachain_head(
		&self,
		at_block: HeaderIdOf<P::SourceChain>,
		metrics: Option<&ParachainsLoopMetrics>,
		para_id: ParaId,
Serban Iorga's avatar
Serban Iorga committed
	) -> Result<AvailableHeader<ParaHash>, Self::Error>;

	/// Get parachain heads proof.
	///
	/// The number and order of entries in the resulting parachain head hashes vector must match the
	/// number and order of parachains in the `parachains` vector. The incorrect implementation will
	/// result in panic.
	async fn prove_parachain_heads(
		&self,
		at_block: HeaderIdOf<P::SourceChain>,
		parachains: &[ParaId],
	) -> Result<(ParaHeadsProof, Vec<ParaHash>), Self::Error>;
}

/// Target client used in parachain heads synchronization loop.
#[async_trait]
pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
	/// Transaction tracker to track submitted transactions.
	type TransactionTracker: TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>>;
	/// Get best block id.
	async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;

	/// Get best finalized source block id.
	async fn best_finalized_source_block(
		&self,
		at_block: &HeaderIdOf<P::TargetChain>,
	) -> Result<HeaderIdOf<P::SourceChain>, Self::Error>;

	/// Get parachain head hash at given block.
	///
	/// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_target`
	/// on provided `metrics` object to update corresponding metric value.
	async fn parachain_head(
		&self,
		at_block: HeaderIdOf<P::TargetChain>,
		metrics: Option<&ParachainsLoopMetrics>,
		para_id: ParaId,
	) -> Result<Option<BestParaHeadHash>, Self::Error>;

	/// Submit parachain heads proof.
	async fn submit_parachain_heads_proof(
		&self,
		at_source_block: HeaderIdOf<P::SourceChain>,
		updated_parachains: Vec<(ParaId, ParaHash)>,
		proof: ParaHeadsProof,
	) -> Result<Self::TransactionTracker, Self::Error>;
}

/// Return prefix that will be used by default to expose Prometheus metrics of the parachains
/// sync loop.
pub fn metrics_prefix<P: ParachainsPipeline>() -> String {
	format!("{}_to_{}_Parachains", P::SourceChain::NAME, P::TargetChain::NAME)
}

/// Run parachain heads synchronization.
pub async fn run<P: ParachainsPipeline>(
	source_client: impl SourceClient<P>,
	target_client: impl TargetClient<P>,
	sync_params: ParachainSyncParams,
	metrics_params: MetricsParams,
	exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), relay_utils::Error>
where
	P::SourceChain: Chain<BlockNumber = RelayBlockNumber>,
{
	let exit_signal = exit_signal.shared();
	relay_utils::relay_loop(source_client, target_client)
		.with_metrics(metrics_params)
		.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
		.expose()
		.await?
		.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
			run_until_connection_lost(
				source_client,
				target_client,
				sync_params.clone(),
				metrics,
				exit_signal.clone(),
			)
		})
		.await
}

/// Run parachain heads synchronization.
async fn run_until_connection_lost<P: ParachainsPipeline>(
	source_client: impl SourceClient<P>,
	target_client: impl TargetClient<P>,
	sync_params: ParachainSyncParams,
	metrics: Option<ParachainsLoopMetrics>,
	exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient>
Loading full blame...