// Copyright 2019-2021 Parity Technologies (UK) Ltd. // This file is part of Parity Bridges Common. // Parity Bridges Common is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Bridges Common is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . //! The loop basically reads all missing headers and their finality proofs from the source client. //! The proof for the best possible header is then submitted to the target node. The only exception //! is the mandatory headers, which we always submit to the target node. For such headers, we //! assume that the persistent proof either exists, or will eventually become available. use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader}; use async_trait::async_trait; use backoff::backoff::Backoff; use futures::{select, Future, FutureExt, Stream, StreamExt}; use headers_relay::sync_loop_metrics::SyncLoopMetrics; use num_traits::{One, Saturating}; use relay_utils::{ metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, relay_loop::Client as RelayClient, retry_backoff, FailedClient, MaybeConnectionError, }; use std::{ pin::Pin, time::{Duration, Instant}, }; /// Finality proof synchronization loop parameters. #[derive(Debug, Clone)] pub struct FinalitySyncParams { /// Interval at which we check updates on both clients. Normally should be larger than /// `min(source_block_time, target_block_time)`. /// /// This parameter may be used to limit transactions rate. Increase the value && you'll get /// infrequent updates => sparse headers => potential slow down of bridge applications, but pallet storage /// won't be super large. Decrease the value to near `source_block_time` and you'll get /// transaction for (almost) every block of the source chain => all source headers will be known /// to the target chain => bridge applications will run faster, but pallet storage may explode /// (but if pruning is there, then it's fine). pub tick: Duration, /// Number of finality proofs to keep in internal buffer between loop wakeups. /// /// While in "major syncing" state, we still read finality proofs from the stream. They're stored /// in the internal buffer between loop wakeups. When we're close to the tip of the chain, we may /// meet finality delays if headers are not finalized frequently. So instead of waiting for next /// finality proof to appear in the stream, we may use existing proof from that buffer. pub recent_finality_proofs_limit: usize, /// Timeout before we treat our transactions as lost and restart the whole sync process. pub stall_timeout: Duration, } /// Source client used in finality synchronization loop. #[async_trait] pub trait SourceClient: RelayClient { /// Stream of new finality proofs. The stream is allowed to miss proofs for some /// headers, even if those headers are mandatory. type FinalityProofsStream: Stream; /// Get best finalized block number. async fn best_finalized_block_number(&self) -> Result; /// Get canonical header and its finality proof by number. async fn header_and_finality_proof( &self, number: P::Number, ) -> Result<(P::Header, Option), Self::Error>; /// Subscribe to new finality proofs. async fn finality_proofs(&self) -> Result; } /// Target client used in finality synchronization loop. #[async_trait] pub trait TargetClient: RelayClient { /// Get best finalized source block number. async fn best_finalized_source_block_number(&self) -> Result; /// Submit header finality proof. async fn submit_finality_proof(&self, header: P::Header, proof: P::FinalityProof) -> Result<(), Self::Error>; } /// Run finality proofs synchronization loop. pub fn run( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: FinalitySyncParams, metrics_params: Option, exit_signal: impl Future, ) { let exit_signal = exit_signal.shared(); let metrics_global = GlobalMetrics::default(); let metrics_sync = SyncLoopMetrics::default(); let metrics_enabled = metrics_params.is_some(); metrics_start( format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME), metrics_params, &metrics_global, &metrics_sync, ); relay_utils::relay_loop::run( relay_utils::relay_loop::RECONNECT_DELAY, source_client, target_client, |source_client, target_client| { run_until_connection_lost( source_client, target_client, sync_params.clone(), if metrics_enabled { Some(metrics_global.clone()) } else { None }, if metrics_enabled { Some(metrics_sync.clone()) } else { None }, exit_signal.clone(), ) }, ); } /// Unjustified headers container. Ordered by header number. pub(crate) type UnjustifiedHeaders

= Vec<

::Header>; /// Finality proofs container. Ordered by target header number. pub(crate) type FinalityProofs

= Vec<(

::Number,

::FinalityProof, )>; /// Error that may happen inside finality synchronization loop. #[derive(Debug)] enum Error { /// Source client request has failed with given error. Source(SourceError), /// Target client request has failed with given error. Target(TargetError), /// Finality proof for mandatory header is missing from the source node. MissingMandatoryFinalityProof(P::Number), /// The synchronization has stalled. Stalled, } impl Error where P: FinalitySyncPipeline, SourceError: MaybeConnectionError, TargetError: MaybeConnectionError, { fn fail_if_connection_error(&self) -> Result<(), FailedClient> { match *self { Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source), Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target), Error::Stalled => Err(FailedClient::Both), _ => Ok(()), } } } /// Information about transaction that we have submitted. #[derive(Debug, Clone)] struct Transaction { /// Time when we have submitted this transaction. pub time: Instant, /// The number of the header we have submitted. pub submitted_header_number: Number, } /// Finality proofs stream that may be restarted. struct RestartableFinalityProofsStream { /// Flag that the stream needs to be restarted. needs_restart: bool, /// The stream itself. stream: Pin>, } /// Finality synchronization loop state. struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> { /// Synchronization loop progress. progress: &'a mut (Instant, Option), /// Finality proofs stream. finality_proofs_stream: &'a mut RestartableFinalityProofsStream, /// Recent finality proofs that we have read from the stream. recent_finality_proofs: &'a mut FinalityProofs

, /// Last transaction that we have submitted to the target node. last_transaction: Option>, } async fn run_until_connection_lost( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: FinalitySyncParams, metrics_global: Option, metrics_sync: Option, exit_signal: impl Future, ) -> Result<(), FailedClient> { let restart_finality_proofs_stream = || async { source_client.finality_proofs().await.map_err(|error| { log::error!( target: "bridge", "Failed to subscribe to {} justifications: {:?}. Going to reconnect", P::SOURCE_NAME, error, ); FailedClient::Source }) }; let exit_signal = exit_signal.fuse(); futures::pin_mut!(exit_signal); let mut finality_proofs_stream = RestartableFinalityProofsStream { needs_restart: false, stream: Box::pin(restart_finality_proofs_stream().await?), }; let mut recent_finality_proofs = Vec::new(); let mut progress = (Instant::now(), None); let mut retry_backoff = retry_backoff(); let mut last_transaction = None; loop { // run loop iteration let iteration_result = run_loop_iteration( &source_client, &target_client, FinalityLoopState { progress: &mut progress, finality_proofs_stream: &mut finality_proofs_stream, recent_finality_proofs: &mut recent_finality_proofs, last_transaction: last_transaction.clone(), }, &sync_params, &metrics_sync, ) .await; // update global metrics if let Some(ref metrics_global) = metrics_global { metrics_global.update().await; } // deal with errors let next_tick = match iteration_result { Ok(updated_last_transaction) => { last_transaction = updated_last_transaction; retry_backoff.reset(); sync_params.tick } Err(error) => { log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error); error.fail_if_connection_error()?; retry_backoff .next_backoff() .unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY) } }; if finality_proofs_stream.needs_restart { finality_proofs_stream.needs_restart = false; finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?); } // wait till exit signal, or new source block select! { _ = async_std::task::sleep(next_tick).fuse() => {}, _ = exit_signal => return Ok(()), } } } async fn run_loop_iteration( source_client: &SC, target_client: &TC, state: FinalityLoopState<'_, P, SC::FinalityProofsStream>, sync_params: &FinalitySyncParams, metrics_sync: &Option, ) -> Result>, Error> where P: FinalitySyncPipeline, SC: SourceClient

, TC: TargetClient

, { // read best source headers ids from source and target nodes let best_number_at_source = source_client .best_finalized_block_number() .await .map_err(Error::Source)?; let best_number_at_target = target_client .best_finalized_source_block_number() .await .map_err(Error::Target)?; if let Some(ref metrics_sync) = *metrics_sync { metrics_sync.update_best_block_at_source(best_number_at_source); metrics_sync.update_best_block_at_target(best_number_at_target); } *state.progress = print_sync_progress::

(*state.progress, best_number_at_source, best_number_at_target); // if we have already submitted header, then we just need to wait for it // if we're waiting too much, then we believe our transaction has been lost and restart sync if let Some(last_transaction) = state.last_transaction { if best_number_at_target >= last_transaction.submitted_header_number { // transaction has been mined && we can continue } else if last_transaction.time.elapsed() > sync_params.stall_timeout { log::error!( target: "bridge", "Finality synchronization from {} to {} has stalled. Going to restart", P::SOURCE_NAME, P::TARGET_NAME, ); return Err(Error::Stalled); } else { return Ok(Some(last_transaction)); } } // submit new header if we have something new match select_header_to_submit( source_client, target_client, state.finality_proofs_stream, state.recent_finality_proofs, best_number_at_source, best_number_at_target, sync_params, ) .await? { Some((header, justification)) => { let new_transaction = Transaction { time: Instant::now(), submitted_header_number: header.number(), }; log::debug!( target: "bridge", "Going to submit finality proof of {} header #{:?} to {}", P::SOURCE_NAME, new_transaction.submitted_header_number, P::TARGET_NAME, ); target_client .submit_finality_proof(header, justification) .await .map_err(Error::Target)?; Ok(Some(new_transaction)) } None => Ok(None), } } async fn select_header_to_submit( source_client: &SC, _target_client: &TC, finality_proofs_stream: &mut RestartableFinalityProofsStream, recent_finality_proofs: &mut FinalityProofs

, best_number_at_source: P::Number, best_number_at_target: P::Number, sync_params: &FinalitySyncParams, ) -> Result, Error> where P: FinalitySyncPipeline, SC: SourceClient

, TC: TargetClient

, { let mut selected_finality_proof = None; let mut unjustified_headers = Vec::new(); // to see that the loop is progressing log::trace!( target: "bridge", "Considering range of headers ({:?}; {:?}]", best_number_at_target, best_number_at_source, ); // read missing headers. if we see that the header schedules GRANDPA change, we need to // submit this header let mut header_number = best_number_at_target + One::one(); while header_number <= best_number_at_source { let (header, finality_proof) = source_client .header_and_finality_proof(header_number) .await .map_err(Error::Source)?; let is_mandatory = header.is_mandatory(); match (is_mandatory, finality_proof) { (true, Some(finality_proof)) => { log::trace!(target: "bridge", "Header {:?} is mandatory", header_number); return Ok(Some((header, finality_proof))); } (true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())), (false, Some(finality_proof)) => { log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number); selected_finality_proof = Some((header, finality_proof)); prune_unjustified_headers::

(header_number, &mut unjustified_headers); } (false, None) => { unjustified_headers.push(header); } } header_number = header_number + One::one(); } // see if we can improve finality by using recent finality proofs if !unjustified_headers.is_empty() && !recent_finality_proofs.is_empty() { const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed"; // we need proofs for headers in range unjustified_range_begin..=unjustified_range_end let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number(); let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number(); // we have proofs for headers in range buffered_range_begin..=buffered_range_end let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0; let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0; // we have two ranges => find intersection let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin); let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end); let intersection = intersection_begin..=intersection_end; // find last proof from intersection let selected_finality_proof_index = recent_finality_proofs .binary_search_by_key(intersection.end(), |(number, _)| *number) .unwrap_or_else(|index| index.saturating_sub(1)); let (selected_header_number, finality_proof) = &recent_finality_proofs[selected_finality_proof_index]; if intersection.contains(selected_header_number) { // now remove all obsolete headers and extract selected header let selected_header = prune_unjustified_headers::

(*selected_header_number, &mut unjustified_headers) .expect("unjustified_headers contain all headers from intersection; qed"); selected_finality_proof = Some((selected_header, finality_proof.clone())); } } // read all proofs from the stream, probably selecting updated proof that we're going to submit loop { let next_proof = finality_proofs_stream.stream.next(); let finality_proof = match next_proof.now_or_never() { Some(Some(finality_proof)) => finality_proof, Some(None) => { finality_proofs_stream.needs_restart = true; break; } None => break, }; let finality_proof_target_header_number = match finality_proof.target_header_number() { Some(target_header_number) => target_header_number, None => { continue; } }; let justified_header = prune_unjustified_headers::

(finality_proof_target_header_number, &mut unjustified_headers); if let Some(justified_header) = justified_header { recent_finality_proofs.clear(); selected_finality_proof = Some((justified_header, finality_proof)); } else { // the number of proofs read during single wakeup is expected to be low, so we aren't pruning // `recent_finality_proofs` collection too often recent_finality_proofs.push((finality_proof_target_header_number, finality_proof)); } } // remove obsolete 'recent' finality proofs + keep its size under certain limit let oldest_finality_proof_to_keep = selected_finality_proof .as_ref() .map(|(header, _)| header.number()) .unwrap_or(best_number_at_target); prune_recent_finality_proofs::

( oldest_finality_proof_to_keep, recent_finality_proofs, sync_params.recent_finality_proofs_limit, ); Ok(selected_finality_proof) } /// Remove headers from `unjustified_headers` collection with number lower or equal than `justified_header_number`. /// /// Returns the header that matches `justified_header_number` (if any). pub(crate) fn prune_unjustified_headers( justified_header_number: P::Number, unjustified_headers: &mut UnjustifiedHeaders

, ) -> Option { prune_ordered_vec(justified_header_number, unjustified_headers, usize::MAX, |header| { header.number() }) } pub(crate) fn prune_recent_finality_proofs( justified_header_number: P::Number, recent_finality_proofs: &mut FinalityProofs

, recent_finality_proofs_limit: usize, ) { prune_ordered_vec( justified_header_number, recent_finality_proofs, recent_finality_proofs_limit, |(header_number, _)| *header_number, ); } fn prune_ordered_vec( header_number: Number, ordered_vec: &mut Vec, maximal_vec_size: usize, extract_header_number: impl Fn(&T) -> Number, ) -> Option { let position = ordered_vec.binary_search_by_key(&header_number, extract_header_number); // first extract element we're interested in let extracted_element = match position { Ok(position) => { let updated_vec = ordered_vec.split_off(position + 1); let extracted_element = ordered_vec.pop().expect( "binary_search_by_key has returned Ok(); so there's element at `position`;\ we're splitting vec at `position+1`; so we have pruned at least 1 element;\ qed", ); *ordered_vec = updated_vec; Some(extracted_element) } Err(position) => { *ordered_vec = ordered_vec.split_off(position); None } }; // now - limit vec by size let split_index = ordered_vec.len().saturating_sub(maximal_vec_size); *ordered_vec = ordered_vec.split_off(split_index); extracted_element } fn print_sync_progress( progress_context: (Instant, Option), best_number_at_source: P::Number, best_number_at_target: P::Number, ) -> (Instant, Option) { let (prev_time, prev_best_number_at_target) = progress_context; let now = Instant::now(); let need_update = now - prev_time > Duration::from_secs(10) || prev_best_number_at_target .map(|prev_best_number_at_target| { best_number_at_target.saturating_sub(prev_best_number_at_target) > 10.into() }) .unwrap_or(true); if !need_update { return (prev_time, prev_best_number_at_target); } log::info!( target: "bridge", "Synced {:?} of {:?} headers", best_number_at_target, best_number_at_source, ); (now, Some(best_number_at_target)) }