// Copyright 2019-2021 Parity Technologies (UK) Ltd. // This file is part of Parity Bridges Common. // Parity Bridges Common is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Bridges Common is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . //! The loop basically reads all missing headers and their finality proofs from the source client. //! The proof for the best possible header is then submitted to the target node. The only exception //! is the mandatory headers, which we always submit to the target node. For such headers, we //! assume that the persistent proof either exists, or will eventually become available. use crate::{ sync_loop_metrics::SyncLoopMetrics, FinalityProof, FinalitySyncPipeline, SourceHeader, }; use async_trait::async_trait; use backoff::backoff::Backoff; use futures::{select, Future, FutureExt, Stream, StreamExt}; use num_traits::{One, Saturating}; use relay_utils::{ metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient, HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker, }; use std::{ fmt::Debug, pin::Pin, time::{Duration, Instant}, }; /// Finality proof synchronization loop parameters. #[derive(Debug, Clone)] pub struct FinalitySyncParams { /// Interval at which we check updates on both clients. Normally should be larger than /// `min(source_block_time, target_block_time)`. /// /// This parameter may be used to limit transactions rate. Increase the value && you'll get /// infrequent updates => sparse headers => potential slow down of bridge applications, but /// pallet storage won't be super large. Decrease the value to near `source_block_time` and /// you'll get transaction for (almost) every block of the source chain => all source headers /// will be known to the target chain => bridge applications will run faster, but pallet /// storage may explode (but if pruning is there, then it's fine). pub tick: Duration, /// Number of finality proofs to keep in internal buffer between loop iterations. /// /// While in "major syncing" state, we still read finality proofs from the stream. They're /// stored in the internal buffer between loop iterations. When we're close to the tip of the /// chain, we may meet finality delays if headers are not finalized frequently. So instead of /// waiting for next finality proof to appear in the stream, we may use existing proof from /// that buffer. pub recent_finality_proofs_limit: usize, /// Timeout before we treat our transactions as lost and restart the whole sync process. pub stall_timeout: Duration, /// If true, only mandatory headers are relayed. pub only_mandatory_headers: bool, } /// Source client used in finality synchronization loop. #[async_trait] pub trait SourceClient: RelayClient { /// Stream of new finality proofs. The stream is allowed to miss proofs for some /// headers, even if those headers are mandatory. type FinalityProofsStream: Stream + Send; /// Get best finalized block number. async fn best_finalized_block_number(&self) -> Result; /// Get canonical header and its finality proof by number. async fn header_and_finality_proof( &self, number: P::Number, ) -> Result<(P::Header, Option), Self::Error>; /// Subscribe to new finality proofs. async fn finality_proofs(&self) -> Result; } /// Target client used in finality synchronization loop. #[async_trait] pub trait TargetClient: RelayClient { /// Transaction tracker to track submitted transactions. type TransactionTracker: TransactionTracker; /// Get best finalized source block number. async fn best_finalized_source_block_id( &self, ) -> Result, Self::Error>; /// Submit header finality proof. async fn submit_finality_proof( &self, header: P::Header, proof: P::FinalityProof, ) -> Result; } /// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs /// sync loop. pub fn metrics_prefix() -> String { format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME) } /// Run finality proofs synchronization loop. pub async fn run( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: FinalitySyncParams, metrics_params: MetricsParams, exit_signal: impl Future + 'static + Send, ) -> Result<(), relay_utils::Error> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) .with_metrics(metrics_params) .loop_metric(SyncLoopMetrics::new( Some(&metrics_prefix::

()), "source", "source_at_target", )?)? .expose() .await? .run(metrics_prefix::

(), move |source_client, target_client, metrics| { run_until_connection_lost( source_client, target_client, sync_params.clone(), metrics, exit_signal.clone(), ) }) .await } /// Unjustified headers container. Ordered by header number. pub(crate) type UnjustifiedHeaders = Vec; /// Finality proofs container. Ordered by target header number. pub(crate) type FinalityProofs

= Vec<(

::Number,

::FinalityProof)>; /// Reference to finality proofs container. pub(crate) type FinalityProofsRef<'a, P> = &'a [(

::Number,

::FinalityProof)]; /// Error that may happen inside finality synchronization loop. #[derive(Debug)] pub(crate) enum Error { /// Source client request has failed with given error. Source(SourceError), /// Target client request has failed with given error. Target(TargetError), /// Finality proof for mandatory header is missing from the source node. MissingMandatoryFinalityProof(P::Number), } impl Error where P: FinalitySyncPipeline, SourceError: MaybeConnectionError, TargetError: MaybeConnectionError, { fn fail_if_connection_error(&self) -> Result<(), FailedClient> { match *self { Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source), Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target), _ => Ok(()), } } } /// Information about transaction that we have submitted. #[derive(Debug, Clone)] pub(crate) struct Transaction { /// Submitted transaction tracker. pub tracker: Tracker, /// The number of the header we have submitted. pub submitted_header_number: Number, } impl Transaction { pub async fn submit< C: TargetClient, P: FinalitySyncPipeline, >( target_client: &C, header: P::Header, justification: P::FinalityProof, ) -> Result { let submitted_header_number = header.number(); log::debug!( target: "bridge", "Going to submit finality proof of {} header #{:?} to {}", P::SOURCE_NAME, submitted_header_number, P::TARGET_NAME, ); let tracker = target_client.submit_finality_proof(header, justification).await?; Ok(Transaction { tracker, submitted_header_number }) } pub async fn track, P: FinalitySyncPipeline>( self, target_client: &C, ) -> Result<(), String> { match self.tracker.wait().await { TrackedTransactionStatus::Finalized(_) => { // The transaction has been finalized, but it may have been finalized in the // "failed" state. So let's check if the block number was actually updated. // If it wasn't then we are stalled. // // Please also note that we're returning an error if we fail to read required data // from the target client - that's the best we can do here to avoid actual stall. target_client .best_finalized_source_block_id() .await .map_err(|e| format!("failed to read best block from target node: {e:?}")) .and_then(|best_id_at_target| { if self.submitted_header_number > best_id_at_target.0 { return Err(format!( "best block at target after tx is {:?} and we've submitted {:?}", best_id_at_target.0, self.submitted_header_number, )) } Ok(()) }) }, TrackedTransactionStatus::Lost => Err("transaction failed".to_string()), } } } /// Finality proofs stream that may be restarted. pub(crate) struct RestartableFinalityProofsStream { /// Flag that the stream needs to be restarted. pub(crate) needs_restart: bool, /// The stream itself. stream: Pin>, } impl RestartableFinalityProofsStream { pub async fn create_raw_stream< C: SourceClient, P: FinalitySyncPipeline, >( source_client: &C, ) -> Result { source_client.finality_proofs().await.map_err(|error| { log::error!( target: "bridge", "Failed to subscribe to {} justifications: {:?}. Going to reconnect", P::SOURCE_NAME, error, ); FailedClient::Source }) } pub async fn restart_if_scheduled< C: SourceClient, P: FinalitySyncPipeline, >( &mut self, source_client: &C, ) -> Result<(), FailedClient> { if self.needs_restart { log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME); self.needs_restart = false; self.stream = Box::pin(Self::create_raw_stream(source_client).await?); } Ok(()) } pub fn next(&mut self) -> Option { match self.stream.next().now_or_never() { Some(Some(finality_proof)) => Some(finality_proof), Some(None) => { self.needs_restart = true; None }, None => None, } } } impl From for RestartableFinalityProofsStream { fn from(stream: S) -> Self { RestartableFinalityProofsStream { needs_restart: false, stream: Box::pin(stream) } } } /// Finality synchronization loop state. pub(crate) struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> { /// Synchronization loop progress. pub(crate) progress: &'a mut (Instant, Option), /// Finality proofs stream. pub(crate) finality_proofs_stream: &'a mut RestartableFinalityProofsStream, /// Recent finality proofs that we have read from the stream. pub(crate) recent_finality_proofs: &'a mut FinalityProofs

, /// Number of the last header, submitted to the target node. pub(crate) submitted_header_number: Option, } /// Run finality relay loop until connection to one of nodes is lost. pub(crate) async fn run_until_connection_lost( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: FinalitySyncParams, metrics_sync: Option, exit_signal: impl Future, ) -> Result<(), FailedClient> { let last_transaction_tracker = futures::future::Fuse::terminated(); let exit_signal = exit_signal.fuse(); futures::pin_mut!(last_transaction_tracker, exit_signal); let mut finality_proofs_stream = RestartableFinalityProofsStream::create_raw_stream(&source_client).await?.into(); let mut recent_finality_proofs = Vec::new(); let mut progress = (Instant::now(), None); let mut retry_backoff = retry_backoff(); let mut last_submitted_header_number = None; loop { // run loop iteration let iteration_result = run_loop_iteration( &source_client, &target_client, FinalityLoopState { progress: &mut progress, finality_proofs_stream: &mut finality_proofs_stream, recent_finality_proofs: &mut recent_finality_proofs, submitted_header_number: last_submitted_header_number, }, &sync_params, &metrics_sync, ) .await; // deal with errors let next_tick = match iteration_result { Ok(Some(updated_transaction)) => { last_submitted_header_number = Some(updated_transaction.submitted_header_number); last_transaction_tracker.set(updated_transaction.track(&target_client).fuse()); retry_backoff.reset(); sync_params.tick }, Ok(None) => { retry_backoff.reset(); sync_params.tick }, Err(error) => { log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error); error.fail_if_connection_error()?; retry_backoff.next_backoff().unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY) }, }; finality_proofs_stream.restart_if_scheduled(&source_client).await?; // wait till exit signal, or new source block select! { transaction_result = last_transaction_tracker => { transaction_result.map_err(|e| { log::error!( target: "bridge", "Finality synchronization from {} to {} has stalled with error: {}. Going to restart", P::SOURCE_NAME, P::TARGET_NAME, e, ); // Restart the loop if we're stalled. FailedClient::Both })? }, _ = async_std::task::sleep(next_tick).fuse() => {}, _ = exit_signal => return Ok(()), } } } pub(crate) async fn run_loop_iteration( source_client: &SC, target_client: &TC, state: FinalityLoopState<'_, P, SC::FinalityProofsStream>, sync_params: &FinalitySyncParams, metrics_sync: &Option, ) -> Result>, Error> where P: FinalitySyncPipeline, SC: SourceClient

, TC: TargetClient

, { // read best source headers ids from source and target nodes let best_number_at_source = source_client.best_finalized_block_number().await.map_err(Error::Source)?; let best_id_at_target = target_client.best_finalized_source_block_id().await.map_err(Error::Target)?; let best_number_at_target = best_id_at_target.0; let different_hash_at_source = ensure_same_fork::(&best_id_at_target, source_client) .await .map_err(Error::Source)?; let using_same_fork = different_hash_at_source.is_none(); if let Some(ref different_hash_at_source) = different_hash_at_source { log::error!( target: "bridge", "Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \ at-source {:?} vs at-target {:?}", P::SOURCE_NAME, P::TARGET_NAME, best_number_at_target, different_hash_at_source, best_id_at_target.1, ); } if let Some(ref metrics_sync) = *metrics_sync { metrics_sync.update_best_block_at_source(best_number_at_source); metrics_sync.update_best_block_at_target(best_number_at_target); metrics_sync.update_using_same_fork(using_same_fork); } *state.progress = print_sync_progress::

(*state.progress, best_number_at_source, best_number_at_target); // if we have already submitted header, then we just need to wait for it // if we're waiting too much, then we believe our transaction has been lost and restart sync if let Some(submitted_header_number) = state.submitted_header_number { if best_number_at_target >= submitted_header_number { // transaction has been mined && we can continue } else { return Ok(None) } } // submit new header if we have something new match select_header_to_submit( source_client, target_client, state.finality_proofs_stream, state.recent_finality_proofs, best_number_at_source, best_number_at_target, sync_params, ) .await? { Some((header, justification)) => { let transaction = Transaction::submit(target_client, header, justification) .await .map_err(Error::Target)?; Ok(Some(transaction)) }, None => Ok(None), } } pub(crate) async fn select_header_to_submit( source_client: &SC, target_client: &TC, finality_proofs_stream: &mut RestartableFinalityProofsStream, recent_finality_proofs: &mut FinalityProofs

, best_number_at_source: P::Number, best_number_at_target: P::Number, sync_params: &FinalitySyncParams, ) -> Result, Error> where P: FinalitySyncPipeline, SC: SourceClient

, TC: TargetClient

, { // to see that the loop is progressing log::trace!( target: "bridge", "Considering range of headers ({:?}; {:?}]", best_number_at_target, best_number_at_source, ); // read missing headers. if we see that the header schedules GRANDPA change, we need to // submit this header let selected_finality_proof = read_missing_headers::( source_client, target_client, best_number_at_source, best_number_at_target, ) .await?; let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof { SelectedFinalityProof::Mandatory(header, finality_proof) => return Ok(Some((header, finality_proof))), _ if sync_params.only_mandatory_headers => { // we are not reading finality proofs from the stream, so eventually it'll break // but we don't care about transient proofs at all, so it is acceptable return Ok(None) }, SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) => (unjustified_headers, Some((header, finality_proof))), SelectedFinalityProof::None(unjustified_headers) => (unjustified_headers, None), }; // all headers that are missing from the target client are non-mandatory // => even if we have already selected some header and its persistent finality proof, // we may try to select better header by reading non-persistent proofs from the stream read_finality_proofs_from_stream::(finality_proofs_stream, recent_finality_proofs); selected_finality_proof = select_better_recent_finality_proof::

( recent_finality_proofs, &mut unjustified_headers, selected_finality_proof, ); // remove obsolete 'recent' finality proofs + keep its size under certain limit let oldest_finality_proof_to_keep = selected_finality_proof .as_ref() .map(|(header, _)| header.number()) .unwrap_or(best_number_at_target); prune_recent_finality_proofs::

( oldest_finality_proof_to_keep, recent_finality_proofs, sync_params.recent_finality_proofs_limit, ); Ok(selected_finality_proof) } /// Ensures that both clients are on the same fork. /// /// Returns `Some(_)` with header has at the source client if headers are different. async fn ensure_same_fork>( best_id_at_target: &HeaderId, source_client: &SC, ) -> Result, SC::Error> { let header_at_source = source_client.header_and_finality_proof(best_id_at_target.0).await?.0; let header_hash_at_source = header_at_source.hash(); Ok(if best_id_at_target.1 == header_hash_at_source { None } else { Some(header_hash_at_source) }) } /// Finality proof that has been selected by the `read_missing_headers` function. pub(crate) enum SelectedFinalityProof { /// Mandatory header and its proof has been selected. We shall submit proof for this header. Mandatory(Header, FinalityProof), /// Regular header and its proof has been selected. We may submit this proof, or proof for /// some better header. Regular(UnjustifiedHeaders

, Header, FinalityProof), /// We haven't found any missing header with persistent proof at the target client. None(UnjustifiedHeaders
), } /// Read missing headers and their persistent finality proofs from the target client. /// /// If we have found some header with known proof, it is returned. /// Otherwise, `SelectedFinalityProof::None` is returned. /// /// Unless we have found mandatory header, all missing headers are collected and returned. pub(crate) async fn read_missing_headers< P: FinalitySyncPipeline, SC: SourceClient

, TC: TargetClient

, >( source_client: &SC, _target_client: &TC, best_number_at_source: P::Number, best_number_at_target: P::Number, ) -> Result, Error> { let mut unjustified_headers = Vec::new(); let mut selected_finality_proof = None; let mut header_number = best_number_at_target + One::one(); while header_number <= best_number_at_source { let (header, finality_proof) = source_client .header_and_finality_proof(header_number) .await .map_err(Error::Source)?; let is_mandatory = header.is_mandatory(); match (is_mandatory, finality_proof) { (true, Some(finality_proof)) => { log::trace!(target: "bridge", "Header {:?} is mandatory", header_number); return Ok(SelectedFinalityProof::Mandatory(header, finality_proof)) }, (true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())), (false, Some(finality_proof)) => { log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number); unjustified_headers.clear(); selected_finality_proof = Some((header, finality_proof)); }, (false, None) => { unjustified_headers.push(header); }, } header_number = header_number + One::one(); } log::trace!( target: "bridge", "Read {} {} headers. Selected finality proof for header: {:?}", best_number_at_source.saturating_sub(best_number_at_target), P::SOURCE_NAME, selected_finality_proof.as_ref().map(|(header, _)| header), ); Ok(match selected_finality_proof { Some((header, proof)) => SelectedFinalityProof::Regular(unjustified_headers, header, proof), None => SelectedFinalityProof::None(unjustified_headers), }) } /// Read finality proofs from the stream. pub(crate) fn read_finality_proofs_from_stream< P: FinalitySyncPipeline, FPS: Stream, >( finality_proofs_stream: &mut RestartableFinalityProofsStream, recent_finality_proofs: &mut FinalityProofs

, ) { let mut proofs_count = 0; let mut first_header_number = None; let mut last_header_number = None; while let Some(finality_proof) = finality_proofs_stream.next() { let target_header_number = finality_proof.target_header_number(); if first_header_number.is_none() { first_header_number = Some(target_header_number); } last_header_number = Some(target_header_number); proofs_count += 1; recent_finality_proofs.push((target_header_number, finality_proof)); } if proofs_count != 0 { log::trace!( target: "bridge", "Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]", proofs_count, P::SOURCE_NAME, first_header_number, last_header_number, ); } } /// Try to select better header and its proof, given finality proofs that we /// have recently read from the stream. pub(crate) fn select_better_recent_finality_proof( recent_finality_proofs: FinalityProofsRef

, unjustified_headers: &mut UnjustifiedHeaders, selected_finality_proof: Option<(P::Header, P::FinalityProof)>, ) -> Option<(P::Header, P::FinalityProof)> { if unjustified_headers.is_empty() || recent_finality_proofs.is_empty() { log::trace!( target: "bridge", "Can not improve selected {} finality proof {:?}. No unjustified headers and recent proofs", P::SOURCE_NAME, selected_finality_proof.as_ref().map(|(h, _)| h.number()), ); return selected_finality_proof } const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed"; // we need proofs for headers in range unjustified_range_begin..=unjustified_range_end let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number(); let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number(); // we have proofs for headers in range buffered_range_begin..=buffered_range_end let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0; let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0; // we have two ranges => find intersection let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin); let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end); let intersection = intersection_begin..=intersection_end; // find last proof from intersection let selected_finality_proof_index = recent_finality_proofs .binary_search_by_key(intersection.end(), |(number, _)| *number) .unwrap_or_else(|index| index.saturating_sub(1)); let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index]; let has_selected_finality_proof = intersection.contains(selected_header_number); log::trace!( target: "bridge", "Trying to improve selected {} finality proof {:?}. Headers range: [{:?}; {:?}]. Proofs range: [{:?}; {:?}].\ Trying to improve to: {:?}. Result: {}", P::SOURCE_NAME, selected_finality_proof.as_ref().map(|(h, _)| h.number()), unjustified_range_begin, unjustified_range_end, buffered_range_begin, buffered_range_end, selected_header_number, if has_selected_finality_proof { "improved" } else { "not improved" }, ); if !has_selected_finality_proof { return selected_finality_proof } // now remove all obsolete headers and extract selected header let selected_header_position = unjustified_headers .binary_search_by_key(selected_header_number, |header| header.number()) .expect("unjustified_headers contain all headers from intersection; qed"); let selected_header = unjustified_headers.swap_remove(selected_header_position); Some((selected_header, finality_proof.clone())) } pub(crate) fn prune_recent_finality_proofs( justified_header_number: P::Number, recent_finality_proofs: &mut FinalityProofs

, recent_finality_proofs_limit: usize, ) { let justified_header_idx = recent_finality_proofs .binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number) .map(|idx| idx + 1) .unwrap_or_else(|idx| idx); let proofs_limit_idx = recent_finality_proofs.len().saturating_sub(recent_finality_proofs_limit); *recent_finality_proofs = recent_finality_proofs.split_off(std::cmp::max(justified_header_idx, proofs_limit_idx)); } fn print_sync_progress( progress_context: (Instant, Option), best_number_at_source: P::Number, best_number_at_target: P::Number, ) -> (Instant, Option) { let (prev_time, prev_best_number_at_target) = progress_context; let now = Instant::now(); let need_update = now - prev_time > Duration::from_secs(10) || prev_best_number_at_target .map(|prev_best_number_at_target| { best_number_at_target.saturating_sub(prev_best_number_at_target) > 10.into() }) .unwrap_or(true); if !need_update { return (prev_time, prev_best_number_at_target) } log::info!( target: "bridge", "Synced {:?} of {:?} headers", best_number_at_target, best_number_at_source, ); (now, Some(best_number_at_target)) }