// Copyright 2019-2021 Parity Technologies (UK) Ltd. // This file is part of Parity Bridges Common. // Parity Bridges Common is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Bridges Common is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . //! The loop basically reads all missing headers and their finality proofs from the source client. //! The proof for the best possible header is then submitted to the target node. The only exception //! is the mandatory headers, which we always submit to the target node. For such headers, we //! assume that the persistent proof either exists, or will eventually become available. use crate::{sync_loop_metrics::SyncLoopMetrics, Error, FinalitySyncPipeline, SourceHeader}; use crate::{ base::SourceClientBase, finality_proofs::{FinalityProofsBuf, FinalityProofsStream}, headers::{JustifiedHeader, JustifiedHeaderSelector}, }; use async_trait::async_trait; use backoff::{backoff::Backoff, ExponentialBackoff}; use futures::{future::Fuse, select, Future, FutureExt}; use num_traits::Saturating; use relay_utils::{ metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient, HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker, }; use std::{ fmt::Debug, time::{Duration, Instant}, }; /// Type of headers that we relay. #[derive(Debug, Clone, Copy, PartialEq)] pub enum HeadersToRelay { /// Relay all headers. All, /// Relay only mandatory headers. Mandatory, /// Relay only free (including mandatory) headers. Free, } /// Finality proof synchronization loop parameters. #[derive(Debug, Clone)] pub struct FinalitySyncParams { /// Interval at which we check updates on both clients. Normally should be larger than /// `min(source_block_time, target_block_time)`. /// /// This parameter may be used to limit transactions rate. Increase the value && you'll get /// infrequent updates => sparse headers => potential slow down of bridge applications, but /// pallet storage won't be super large. Decrease the value to near `source_block_time` and /// you'll get transaction for (almost) every block of the source chain => all source headers /// will be known to the target chain => bridge applications will run faster, but pallet /// storage may explode (but if pruning is there, then it's fine). pub tick: Duration, /// Number of finality proofs to keep in internal buffer between loop iterations. /// /// While in "major syncing" state, we still read finality proofs from the stream. They're /// stored in the internal buffer between loop iterations. When we're close to the tip of the /// chain, we may meet finality delays if headers are not finalized frequently. So instead of /// waiting for next finality proof to appear in the stream, we may use existing proof from /// that buffer. pub recent_finality_proofs_limit: usize, /// Timeout before we treat our transactions as lost and restart the whole sync process. pub stall_timeout: Duration, /// If true, only mandatory headers are relayed. pub headers_to_relay: HeadersToRelay, } /// Source client used in finality synchronization loop. #[async_trait] pub trait SourceClient: SourceClientBase

{ /// Get best finalized block number. async fn best_finalized_block_number(&self) -> Result; /// Get canonical header and its finality proof by number. async fn header_and_finality_proof( &self, number: P::Number, ) -> Result<(P::Header, Option), Self::Error>; } /// Target client used in finality synchronization loop. #[async_trait] pub trait TargetClient: RelayClient { /// Transaction tracker to track submitted transactions. type TransactionTracker: TransactionTracker; /// Get best finalized source block number. async fn best_finalized_source_block_id( &self, ) -> Result, Self::Error>; /// Get free source headers submission interval, if it is configured in the /// target runtime. async fn free_source_headers_interval(&self) -> Result, Self::Error>; /// Submit header finality proof. async fn submit_finality_proof( &self, header: P::Header, proof: P::FinalityProof, ) -> Result; } /// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs /// sync loop. pub fn metrics_prefix() -> String { format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME) } /// Finality sync information. pub struct SyncInfo { /// Best finalized header at the source client. pub best_number_at_source: P::Number, /// Best source header, known to the target client. pub best_number_at_target: P::Number, /// Whether the target client follows the same fork as the source client do. pub is_using_same_fork: bool, /// Free headers interval. We assume that the submission of header `N`, divisible /// by `free_headers_interval` will be free for submitter. May be `None` if runtime /// is configured to not allow free headers. If it is `Some(_)`, it is guaranteed /// not to be zero. pub free_headers_interval: Option, } impl SyncInfo

{ /// Checks if both clients are on the same fork. async fn is_on_same_fork>( source_client: &SC, id_at_target: &HeaderId, ) -> Result { let header_at_source = source_client.header_and_finality_proof(id_at_target.0).await?.0; let header_hash_at_source = header_at_source.hash(); Ok(if id_at_target.1 == header_hash_at_source { true } else { log::error!( target: "bridge", "Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \ at-source {:?} vs at-target {:?}", P::SOURCE_NAME, P::TARGET_NAME, id_at_target.0, header_hash_at_source, id_at_target.1, ); false }) } async fn new, TC: TargetClient

>( source_client: &SC, target_client: &TC, ) -> Result> { let best_number_at_source = source_client.best_finalized_block_number().await.map_err(Error::Source)?; let best_id_at_target = target_client.best_finalized_source_block_id().await.map_err(Error::Target)?; let best_number_at_target = best_id_at_target.0; let mut free_headers_interval = target_client.free_source_headers_interval().await.map_err(Error::Target)?; if free_headers_interval == Some(0.into()) { free_headers_interval = None; } let is_using_same_fork = Self::is_on_same_fork(source_client, &best_id_at_target) .await .map_err(Error::Source)?; Ok(Self { best_number_at_source, best_number_at_target, free_headers_interval, is_using_same_fork, }) } fn update_metrics(&self, metrics_sync: &Option) { if let Some(metrics_sync) = metrics_sync { metrics_sync.update_best_block_at_source(self.best_number_at_source); metrics_sync.update_best_block_at_target(self.best_number_at_target); metrics_sync.update_using_same_fork(self.is_using_same_fork); } } pub fn num_headers(&self) -> P::Number { self.best_number_at_source.saturating_sub(self.best_number_at_target) } } /// Information about transaction that we have submitted. #[derive(Debug, Clone)] pub struct Transaction { /// Submitted transaction tracker. tracker: Tracker, /// The number of the header we have submitted. header_number: Number, } impl Transaction { pub async fn submit< P: FinalitySyncPipeline, TC: TargetClient, >( target_client: &TC, header: P::Header, justification: P::FinalityProof, ) -> Result { let header_number = header.number(); log::debug!( target: "bridge", "Going to submit finality proof of {} header #{:?} to {}", P::SOURCE_NAME, header_number, P::TARGET_NAME, ); let tracker = target_client.submit_finality_proof(header, justification).await?; Ok(Transaction { tracker, header_number }) } async fn track< P: FinalitySyncPipeline, SC: SourceClient

, TC: TargetClient

, >( self, target_client: TC, ) -> Result<(), Error> { match self.tracker.wait().await { TrackedTransactionStatus::Finalized(_) => { // The transaction has been finalized, but it may have been finalized in the // "failed" state. So let's check if the block number was actually updated. target_client .best_finalized_source_block_id() .await .map_err(Error::Target) .and_then(|best_id_at_target| { if self.header_number > best_id_at_target.0 { return Err(Error::ProofSubmissionTxFailed { submitted_number: self.header_number, best_number_at_target: best_id_at_target.0, }) } Ok(()) }) }, TrackedTransactionStatus::Lost => Err(Error::ProofSubmissionTxLost), } } } /// Finality synchronization loop state. struct FinalityLoop, TC: TargetClient

> { source_client: SC, target_client: TC, sync_params: FinalitySyncParams, metrics_sync: Option, progress: (Instant, Option), retry_backoff: ExponentialBackoff, finality_proofs_stream: FinalityProofsStream, finality_proofs_buf: FinalityProofsBuf

, best_submitted_number: Option, } impl, TC: TargetClient

> FinalityLoop { pub fn new( source_client: SC, target_client: TC, sync_params: FinalitySyncParams, metrics_sync: Option, ) -> Self { Self { source_client, target_client, sync_params, metrics_sync, progress: (Instant::now(), None), retry_backoff: retry_backoff(), finality_proofs_stream: FinalityProofsStream::new(), finality_proofs_buf: FinalityProofsBuf::new(vec![]), best_submitted_number: None, } } fn update_progress(&mut self, info: &SyncInfo

) { let (prev_time, prev_best_number_at_target) = self.progress; let now = Instant::now(); let needs_update = now - prev_time > Duration::from_secs(10) || prev_best_number_at_target .map(|prev_best_number_at_target| { info.best_number_at_target.saturating_sub(prev_best_number_at_target) > 10.into() }) .unwrap_or(true); if !needs_update { return } log::info!( target: "bridge", "Synced {:?} of {:?} headers", info.best_number_at_target, info.best_number_at_source, ); self.progress = (now, Some(info.best_number_at_target)) } pub async fn select_header_to_submit( &mut self, info: &SyncInfo

, ) -> Result>, Error> { // to see that the loop is progressing log::trace!( target: "bridge", "Considering range of headers ({}; {}]", info.best_number_at_target, info.best_number_at_source ); // read missing headers let selector = JustifiedHeaderSelector::new::( &self.source_client, info, self.sync_params.headers_to_relay, ) .await?; // if we see that the header schedules GRANDPA change, we need to submit it if self.sync_params.headers_to_relay == HeadersToRelay::Mandatory { return Ok(selector.select_mandatory()) } // all headers that are missing from the target client are non-mandatory // => even if we have already selected some header and its persistent finality proof, // we may try to select better header by reading non-persistent proofs from the stream self.finality_proofs_buf.fill(&mut self.finality_proofs_stream); let maybe_justified_header = selector.select(info, self.sync_params.headers_to_relay, &self.finality_proofs_buf); // remove obsolete 'recent' finality proofs + keep its size under certain limit let oldest_finality_proof_to_keep = maybe_justified_header .as_ref() .map(|justified_header| justified_header.number()) .unwrap_or(info.best_number_at_target); self.finality_proofs_buf.prune( oldest_finality_proof_to_keep, Some(self.sync_params.recent_finality_proofs_limit), ); Ok(maybe_justified_header) } pub async fn run_iteration( &mut self, ) -> Result< Option>, Error, > { // read best source headers ids from source and target nodes let info = SyncInfo::new(&self.source_client, &self.target_client).await?; info.update_metrics(&self.metrics_sync); self.update_progress(&info); // if we have already submitted header, then we just need to wait for it // if we're waiting too much, then we believe our transaction has been lost and restart sync if Some(info.best_number_at_target) < self.best_submitted_number { return Ok(None) } // submit new header if we have something new match self.select_header_to_submit(&info).await? { Some(header) => { let transaction = Transaction::submit(&self.target_client, header.header, header.proof) .await .map_err(Error::Target)?; self.best_submitted_number = Some(transaction.header_number); Ok(Some(transaction)) }, None => Ok(None), } } async fn ensure_finality_proofs_stream(&mut self) -> Result<(), FailedClient> { if let Err(e) = self.finality_proofs_stream.ensure_stream(&self.source_client).await { if e.is_connection_error() { return Err(FailedClient::Source) } } Ok(()) } /// Run finality relay loop until connection to one of nodes is lost. async fn run_until_connection_lost( &mut self, exit_signal: impl Future, ) -> Result<(), FailedClient> { self.ensure_finality_proofs_stream().await?; let proof_submission_tx_tracker = Fuse::terminated(); let exit_signal = exit_signal.fuse(); futures::pin_mut!(exit_signal, proof_submission_tx_tracker); loop { // run loop iteration let next_tick = match self.run_iteration().await { Ok(Some(tx)) => { proof_submission_tx_tracker .set(tx.track::(self.target_client.clone()).fuse()); self.retry_backoff.reset(); self.sync_params.tick }, Ok(None) => { self.retry_backoff.reset(); self.sync_params.tick }, Err(error) => { log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error); error.fail_if_connection_error()?; self.retry_backoff .next_backoff() .unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY) }, }; self.ensure_finality_proofs_stream().await?; // wait till exit signal, or new source block select! { proof_submission_result = proof_submission_tx_tracker => { if let Err(e) = proof_submission_result { log::error!( target: "bridge", "Finality sync proof submission tx to {} has failed with error: {:?}.", P::TARGET_NAME, e, ); self.best_submitted_number = None; e.fail_if_connection_error()?; } }, _ = async_std::task::sleep(next_tick).fuse() => {}, _ = exit_signal => return Ok(()), } } } pub async fn run( source_client: SC, target_client: TC, sync_params: FinalitySyncParams, metrics_sync: Option, exit_signal: impl Future, ) -> Result<(), FailedClient> { let mut finality_loop = Self::new(source_client, target_client, sync_params, metrics_sync); finality_loop.run_until_connection_lost(exit_signal).await } } /// Run finality proofs synchronization loop. pub async fn run( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: FinalitySyncParams, metrics_params: MetricsParams, exit_signal: impl Future + 'static + Send, ) -> Result<(), relay_utils::Error> { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) .with_metrics(metrics_params) .loop_metric(SyncLoopMetrics::new( Some(&metrics_prefix::

()), "source", "source_at_target", )?)? .expose() .await? .run(metrics_prefix::

(), move |source_client, target_client, metrics| { FinalityLoop::run( source_client, target_client, sync_params.clone(), metrics, exit_signal.clone(), ) }) .await } #[cfg(test)] mod tests { use super::*; use crate::mock::*; use futures::{FutureExt, StreamExt}; use parking_lot::Mutex; use relay_utils::{FailedClient, HeaderId, TrackedTransactionStatus}; use std::{collections::HashMap, sync::Arc}; fn prepare_test_clients( exit_sender: futures::channel::mpsc::UnboundedSender<()>, state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, source_headers: HashMap)>, ) -> (TestSourceClient, TestTargetClient) { let internal_state_function: Arc = Arc::new(move |data| { if state_function(data) { exit_sender.unbounded_send(()).unwrap(); } }); let clients_data = Arc::new(Mutex::new(ClientsData { source_best_block_number: 10, source_headers, source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)], target_best_block_id: HeaderId(5, 5), target_headers: vec![], target_transaction_tracker: TestTransactionTracker( TrackedTransactionStatus::Finalized(Default::default()), ), })); ( TestSourceClient { on_method_call: internal_state_function.clone(), data: clients_data.clone(), }, TestTargetClient { on_method_call: internal_state_function, data: clients_data }, ) } fn test_sync_params() -> FinalitySyncParams { FinalitySyncParams { tick: Duration::from_secs(0), recent_finality_proofs_limit: 1024, stall_timeout: Duration::from_secs(1), headers_to_relay: HeadersToRelay::All, } } fn run_sync_loop( state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, ) -> (ClientsData, Result<(), FailedClient>) { let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); let (source_client, target_client) = prepare_test_clients( exit_sender, state_function, vec![ (5, (TestSourceHeader(false, 5, 5), None)), (6, (TestSourceHeader(false, 6, 6), None)), (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))), (8, (TestSourceHeader(true, 8, 8), Some(TestFinalityProof(8)))), (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))), (10, (TestSourceHeader(false, 10, 10), None)), ] .into_iter() .collect(), ); let sync_params = test_sync_params(); let clients_data = source_client.data.clone(); let result = async_std::task::block_on(FinalityLoop::run( source_client, target_client, sync_params, None, exit_receiver.into_future().map(|(_, _)| ()), )); let clients_data = clients_data.lock().clone(); (clients_data, result) } #[test] fn finality_sync_loop_works() { let (client_data, result) = run_sync_loop(|data| { // header#7 has persistent finality proof, but it isn't mandatory => it isn't submitted, // because header#8 has persistent finality proof && it is mandatory => it is submitted // header#9 has persistent finality proof, but it isn't mandatory => it is submitted, // because there are no more persistent finality proofs // // once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 // from the stream if data.target_best_block_id.0 == 9 { data.source_best_block_number = 14; data.source_headers.insert(11, (TestSourceHeader(false, 11, 11), None)); data.source_headers .insert(12, (TestSourceHeader(false, 12, 12), Some(TestFinalityProof(12)))); data.source_headers.insert(13, (TestSourceHeader(false, 13, 13), None)); data.source_headers .insert(14, (TestSourceHeader(false, 14, 14), Some(TestFinalityProof(14)))); } // once this ^^^ is done, we generate more blocks && read persistent proof for block 16 if data.target_best_block_id.0 == 14 { data.source_best_block_number = 17; data.source_headers.insert(15, (TestSourceHeader(false, 15, 15), None)); data.source_headers .insert(16, (TestSourceHeader(false, 16, 16), Some(TestFinalityProof(16)))); data.source_headers.insert(17, (TestSourceHeader(false, 17, 17), None)); } data.target_best_block_id.0 == 16 }); assert_eq!(result, Ok(())); assert_eq!( client_data.target_headers, vec![ // before adding 11..14: finality proof for mandatory header#8 (TestSourceHeader(true, 8, 8), TestFinalityProof(8)), // before adding 11..14: persistent finality proof for non-mandatory header#9 (TestSourceHeader(false, 9, 9), TestFinalityProof(9)), // after adding 11..14: ephemeral finality proof for non-mandatory header#14 (TestSourceHeader(false, 14, 14), TestFinalityProof(14)), // after adding 15..17: persistent finality proof for non-mandatory header#16 (TestSourceHeader(false, 16, 16), TestFinalityProof(16)), ], ); } fn run_headers_to_relay_mode_test( headers_to_relay: HeadersToRelay, has_mandatory_headers: bool, ) -> Option> { let (exit_sender, _) = futures::channel::mpsc::unbounded(); let (source_client, target_client) = prepare_test_clients( exit_sender, |_| false, vec![ (6, (TestSourceHeader(false, 6, 6), Some(TestFinalityProof(6)))), (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))), (8, (TestSourceHeader(has_mandatory_headers, 8, 8), Some(TestFinalityProof(8)))), (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))), (10, (TestSourceHeader(false, 10, 10), Some(TestFinalityProof(10)))), ] .into_iter() .collect(), ); async_std::task::block_on(async { let mut finality_loop = FinalityLoop::new( source_client, target_client, FinalitySyncParams { tick: Duration::from_secs(0), recent_finality_proofs_limit: 0, stall_timeout: Duration::from_secs(0), headers_to_relay, }, None, ); let info = SyncInfo { best_number_at_source: 10, best_number_at_target: 5, free_headers_interval: Some(3), is_using_same_fork: true, }; finality_loop.select_header_to_submit(&info).await.unwrap() }) } #[test] fn select_header_to_submit_may_select_non_mandatory_header() { assert_eq!(run_headers_to_relay_mode_test(HeadersToRelay::Mandatory, false), None); assert_eq!( run_headers_to_relay_mode_test(HeadersToRelay::Free, false), Some(JustifiedHeader { header: TestSourceHeader(false, 9, 9), proof: TestFinalityProof(9) }), ); assert_eq!( run_headers_to_relay_mode_test(HeadersToRelay::All, false), Some(JustifiedHeader { header: TestSourceHeader(false, 10, 10), proof: TestFinalityProof(10) }), ); } #[test] fn select_header_to_submit_may_select_mandatory_header() { assert_eq!( run_headers_to_relay_mode_test(HeadersToRelay::Mandatory, true), Some(JustifiedHeader { header: TestSourceHeader(true, 8, 8), proof: TestFinalityProof(8) }), ); assert_eq!( run_headers_to_relay_mode_test(HeadersToRelay::Free, true), Some(JustifiedHeader { header: TestSourceHeader(true, 8, 8), proof: TestFinalityProof(8) }), ); assert_eq!( run_headers_to_relay_mode_test(HeadersToRelay::All, true), Some(JustifiedHeader { header: TestSourceHeader(true, 8, 8), proof: TestFinalityProof(8) }), ); } #[test] fn different_forks_at_source_and_at_target_are_detected() { let (exit_sender, _exit_receiver) = futures::channel::mpsc::unbounded(); let (source_client, target_client) = prepare_test_clients( exit_sender, |_| false, vec![ (5, (TestSourceHeader(false, 5, 42), None)), (6, (TestSourceHeader(false, 6, 6), None)), (7, (TestSourceHeader(false, 7, 7), None)), (8, (TestSourceHeader(false, 8, 8), None)), (9, (TestSourceHeader(false, 9, 9), None)), (10, (TestSourceHeader(false, 10, 10), None)), ] .into_iter() .collect(), ); let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap(); async_std::task::block_on(async { let mut finality_loop = FinalityLoop::new( source_client, target_client, test_sync_params(), Some(metrics_sync.clone()), ); finality_loop.run_iteration().await.unwrap() }); assert!(!metrics_sync.is_using_same_fork()); } }