Skip to content
parachains_loop.rs 35.7 KiB
Newer Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.

// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.

use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline};

use async_trait::async_trait;
use bp_polkadot_core::{
	parachains::{ParaHash, ParaHeadsProof, ParaId},
	BlockNumber as RelayBlockNumber,
};
use futures::{
	future::{FutureExt, Shared},
	poll, select_biased,
use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf, ParachainBase};
use relay_utils::{
	metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient,
	TrackedTransactionStatus, TransactionTracker,
};
use std::{future::Future, pin::Pin, task::Poll};
Serban Iorga's avatar
Serban Iorga committed
/// Parachain header availability at a certain chain.
Serban Iorga's avatar
Serban Iorga committed
pub enum AvailableHeader<T> {
	/// The client can not report actual parachain head at this moment.
	///
	/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
	/// This variant must be treated as "we don't want to update parachain head value at the
	/// target chain at this moment".
	Unavailable,
Serban Iorga's avatar
Serban Iorga committed
	/// There's no parachain header at the relay chain.
	///
	/// Normally it means that the parachain is not registered there.
	Missing,
	/// Parachain head with given hash is available at the source chain.
	Available(T),
Serban Iorga's avatar
Serban Iorga committed
impl<T> AvailableHeader<T> {
	/// Return available header.
	pub fn as_available(&self) -> Option<&T> {
		match *self {
			AvailableHeader::Available(ref header) => Some(header),
			_ => None,
		}
	}
}

impl<T> From<Option<T>> for AvailableHeader<T> {
	fn from(maybe_header: Option<T>) -> AvailableHeader<T> {
		match maybe_header {
			Some(header) => AvailableHeader::Available(header),
			None => AvailableHeader::Missing,
/// Source client used in parachain heads synchronization loop.
#[async_trait]
pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
	/// Returns `Ok(true)` if client is in synced state.
	async fn ensure_synced(&self) -> Result<bool, Self::Error>;

	/// Get finalized relay chain header id by its number.
	async fn relay_header_id(
		&self,
		number: BlockNumberOf<P::SourceRelayChain>,
	) -> Result<HeaderIdOf<P::SourceRelayChain>, Self::Error>;

	/// Get parachain head id at given block.
	async fn parachain_head(
		&self,
		at_block: HeaderIdOf<P::SourceRelayChain>,
	) -> Result<AvailableHeader<HeaderIdOf<P::SourceParachain>>, Self::Error>;
	/// Get parachain head proof at given block.
	async fn prove_parachain_head(
		at_block: HeaderIdOf<P::SourceRelayChain>,
	) -> Result<(ParaHeadsProof, ParaHash), Self::Error>;
}

/// Target client used in parachain heads synchronization loop.
#[async_trait]
pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
	/// Transaction tracker to track submitted transactions.
	type TransactionTracker: TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>>;
	/// Get best block id.
	async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;

	/// Get best finalized source relay chain block id. If `free_source_relay_headers_interval`
	/// is `Some(_)`, the returned
	async fn best_finalized_source_relay_chain_block(
		&self,
		at_block: &HeaderIdOf<P::TargetChain>,
	) -> Result<HeaderIdOf<P::SourceRelayChain>, Self::Error>;
	/// Get free source **relay** headers submission interval, if it is configured in the
	/// target runtime. We assume that the target chain will accept parachain header, proved
	/// at such relay header for free.
	async fn free_source_relay_headers_interval(
		&self,
	) -> Result<Option<BlockNumberOf<P::SourceRelayChain>>, Self::Error>;
	async fn parachain_head(
		&self,
		at_block: HeaderIdOf<P::TargetChain>,
	) -> Result<
		Option<(HeaderIdOf<P::SourceRelayChain>, HeaderIdOf<P::SourceParachain>)>,
		Self::Error,
	>;

	/// Submit parachain heads proof.
		at_source_block: HeaderIdOf<P::SourceRelayChain>,
		para_head_hash: ParaHash,
		proof: ParaHeadsProof,
	) -> Result<Self::TransactionTracker, Self::Error>;
}

/// Return prefix that will be used by default to expose Prometheus metrics of the parachains
/// sync loop.
pub fn metrics_prefix<P: ParachainsPipeline>() -> String {
	format!(
		"{}_to_{}_Parachains_{}",
		P::SourceRelayChain::NAME,
		P::TargetChain::NAME,
		P::SourceParachain::PARACHAIN_ID
	)
}

/// Run parachain heads synchronization.
pub async fn run<P: ParachainsPipeline>(
	source_client: impl SourceClient<P>,
	target_client: impl TargetClient<P>,
	metrics_params: MetricsParams,
	exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), relay_utils::Error>
where
	P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
{
	let exit_signal = exit_signal.shared();
	relay_utils::relay_loop(source_client, target_client)
		.with_metrics(metrics_params)
		.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
		.expose()
		.await?
		.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
			run_until_connection_lost(
				source_client,
				target_client,
				metrics,
				only_free_headers,
				exit_signal.clone(),
			)
		})
		.await
}

/// Run parachain heads synchronization.
async fn run_until_connection_lost<P: ParachainsPipeline>(
	source_client: impl SourceClient<P>,
	target_client: impl TargetClient<P>,
	metrics: Option<ParachainsLoopMetrics>,
	exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient>
where
	P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
{
	let exit_signal = exit_signal.fuse();
	let min_block_interval = std::cmp::min(
		P::SourceRelayChain::AVERAGE_BLOCK_INTERVAL,
		P::TargetChain::AVERAGE_BLOCK_INTERVAL,
	);

	// free parachain header = header, available (proved) at free relay chain block. Let's
	// read interval of free source relay chain blocks from target client
	let free_source_relay_headers_interval = if only_free_headers {
		let free_source_relay_headers_interval =
			target_client.free_source_relay_headers_interval().await.map_err(|e| {
				log::warn!(
					target: "bridge",
					"Failed to read free {} headers interval at {}: {:?}",
Loading full blame...