// Copyright 2019-2021 Parity Technologies (UK) Ltd. // This file is part of Parity Bridges Common. // Parity Bridges Common is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Parity Bridges Common is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline}; use async_trait::async_trait; use bp_polkadot_core::{ parachains::{ParaHash, ParaHeadsProof, ParaId}, BlockNumber as RelayBlockNumber, }; use futures::{ future::{FutureExt, Shared}, poll, select_biased, }; use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf, ParachainBase}; use relay_utils::{ metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, TrackedTransactionStatus, TransactionTracker, }; use std::{future::Future, pin::Pin, task::Poll}; /// Parachain header availability at a certain chain. #[derive(Clone, Copy, Debug)] pub enum AvailableHeader { /// The client can not report actual parachain head at this moment. /// /// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used. /// This variant must be treated as "we don't want to update parachain head value at the /// target chain at this moment". Unavailable, /// There's no parachain header at the relay chain. /// /// Normally it means that the parachain is not registered there. Missing, /// Parachain head with given hash is available at the source chain. Available(T), } impl AvailableHeader { /// Return available header. pub fn as_available(&self) -> Option<&T> { match *self { AvailableHeader::Available(ref header) => Some(header), _ => None, } } } impl From> for AvailableHeader { fn from(maybe_header: Option) -> AvailableHeader { match maybe_header { Some(header) => AvailableHeader::Available(header), None => AvailableHeader::Missing, } } } /// Source client used in parachain heads synchronization loop. #[async_trait] pub trait SourceClient: RelayClient { /// Returns `Ok(true)` if client is in synced state. async fn ensure_synced(&self) -> Result; /// Get parachain head id at given block. async fn parachain_head( &self, at_block: HeaderIdOf, ) -> Result>, Self::Error>; /// Get parachain head proof at given block. async fn prove_parachain_head( &self, at_block: HeaderIdOf, ) -> Result<(ParaHeadsProof, ParaHash), Self::Error>; } /// Target client used in parachain heads synchronization loop. #[async_trait] pub trait TargetClient: RelayClient { /// Transaction tracker to track submitted transactions. type TransactionTracker: TransactionTracker>; /// Get best block id. async fn best_block(&self) -> Result, Self::Error>; /// Get best finalized source relay chain block id. If `free_source_relay_headers_interval` /// is `Some(_)`, the returned async fn best_finalized_source_relay_chain_block( &self, at_block: &HeaderIdOf, ) -> Result, Self::Error>; /// Get free source **relay** headers submission interval, if it is configured in the /// target runtime. We assume that the target chain will accept parachain header, proved /// at such relay header for free. async fn free_source_relay_headers_interval( &self, ) -> Result>, Self::Error>; /// Get parachain head id at given block. async fn parachain_head( &self, at_block: HeaderIdOf, ) -> Result< Option<(HeaderIdOf, HeaderIdOf)>, Self::Error, >; /// Submit parachain heads proof. async fn submit_parachain_head_proof( &self, at_source_block: HeaderIdOf, para_head_hash: ParaHash, proof: ParaHeadsProof, is_free_execution_expected: bool, ) -> Result; } /// Return prefix that will be used by default to expose Prometheus metrics of the parachains /// sync loop. pub fn metrics_prefix() -> String { format!( "{}_to_{}_Parachains_{}", P::SourceRelayChain::NAME, P::TargetChain::NAME, P::SourceParachain::PARACHAIN_ID ) } /// Run parachain heads synchronization. pub async fn run( source_client: impl SourceClient

, target_client: impl TargetClient

, metrics_params: MetricsParams, only_free_headers: bool, exit_signal: impl Future + 'static + Send, ) -> Result<(), relay_utils::Error> where P::SourceRelayChain: Chain, { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) .with_metrics(metrics_params) .loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::

()))?)? .expose() .await? .run(metrics_prefix::

(), move |source_client, target_client, metrics| { run_until_connection_lost( source_client, target_client, metrics, only_free_headers, exit_signal.clone(), ) }) .await } /// Run parachain heads synchronization. async fn run_until_connection_lost( source_client: impl SourceClient

, target_client: impl TargetClient

, metrics: Option, only_free_headers: bool, exit_signal: impl Future + Send, ) -> Result<(), FailedClient> where P::SourceRelayChain: Chain, { let exit_signal = exit_signal.fuse(); let min_block_interval = std::cmp::min( P::SourceRelayChain::AVERAGE_BLOCK_INTERVAL, P::TargetChain::AVERAGE_BLOCK_INTERVAL, ); // free parachain header = header, available (proved) at free relay chain block. Let's // read interval of free source relay chain blocks from target client let free_source_relay_headers_interval = if only_free_headers { let free_source_relay_headers_interval = target_client.free_source_relay_headers_interval().await.map_err(|e| { log::warn!( target: "bridge", "Failed to read free {} headers interval at {}: {:?}", P::SourceRelayChain::NAME, P::TargetChain::NAME, e, ); FailedClient::Target })?; match free_source_relay_headers_interval { Some(free_source_relay_headers_interval) if free_source_relay_headers_interval != 0 => { log::trace!( target: "bridge", "Free {} headers interval at {}: {:?}", P::SourceRelayChain::NAME, P::TargetChain::NAME, free_source_relay_headers_interval, ); free_source_relay_headers_interval }, _ => { log::warn!( target: "bridge", "Invalid free {} headers interval at {}: {:?}", P::SourceRelayChain::NAME, P::TargetChain::NAME, free_source_relay_headers_interval, ); return Err(FailedClient::Target) }, } } else { // ignore - we don't need it 0 }; let mut submitted_heads_tracker: Option> = None; futures::pin_mut!(exit_signal); // Note that the internal loop breaks with `FailedClient` error even if error is non-connection. // It is Ok for now, but it may need to be fixed in the future to use exponential backoff for // regular errors. loop { // Either wait for new block, or exit signal. // Please note that we are prioritizing the exit signal since if both events happen at once // it doesn't make sense to perform one more loop iteration. select_biased! { _ = exit_signal => return Ok(()), _ = async_std::task::sleep(min_block_interval).fuse() => {}, } // if source client is not yet synced, we'll need to sleep. Otherwise we risk submitting too // much redundant transactions match source_client.ensure_synced().await { Ok(true) => (), Ok(false) => { log::warn!( target: "bridge", "{} client is syncing. Won't do anything until it is synced", P::SourceRelayChain::NAME, ); continue }, Err(e) => { log::warn!( target: "bridge", "{} client has failed to return its sync status: {:?}", P::SourceRelayChain::NAME, e, ); return Err(FailedClient::Source) }, } // if we have active transaction, we'll need to wait until it is mined or dropped let best_target_block = target_client.best_block().await.map_err(|e| { log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceRelayChain::NAME, e); FailedClient::Target })?; let (relay_of_head_at_target, head_at_target) = read_head_at_target(&target_client, metrics.as_ref(), &best_target_block).await?; // check if our transaction has been mined if let Some(tracker) = submitted_heads_tracker.take() { match tracker.update(&best_target_block, &head_at_target).await { SubmittedHeadStatus::Waiting(tracker) => { // no news about our transaction and we shall keep waiting submitted_heads_tracker = Some(tracker); continue }, SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)) => { // all heads have been updated, we don't need this tracker anymore }, SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost) => { log::warn!( target: "bridge", "Parachains synchronization from {} to {} has stalled. Going to restart", P::SourceRelayChain::NAME, P::TargetChain::NAME, ); return Err(FailedClient::Both) }, } } // in all-headers strategy we'll be submitting para head, available at // `best_finalized_relay_block_at_target` let best_finalized_relay_block_at_target = target_client .best_finalized_source_relay_chain_block(&best_target_block) .await .map_err(|e| { log::warn!( target: "bridge", "Failed to read best finalized {} block from {}: {:?}", P::SourceRelayChain::NAME, P::TargetChain::NAME, e, ); FailedClient::Target })?; // ..but if we only need to submit free headers, we need to submit para // head, available at best free source relay chain header, known to the // target chain let prove_at_relay_block = if only_free_headers { match relay_of_head_at_target { Some(relay_of_head_at_target) => { // find last free relay chain header in the range that we are interested in let scan_range_begin = relay_of_head_at_target.number(); let scan_range_end = best_finalized_relay_block_at_target.number(); if scan_range_end.saturating_sub(scan_range_begin) < free_source_relay_headers_interval { // there are no new **free** relay chain headers in the range log::trace!( target: "bridge", "Waiting for new free {} headers at {}: scanned {:?}..={:?}", P::SourceRelayChain::NAME, P::TargetChain::NAME, scan_range_begin, scan_range_end, ); continue; } // we may submit new parachain head for free best_finalized_relay_block_at_target }, None => { // no parachain head at target => let's submit first one best_finalized_relay_block_at_target }, } } else { best_finalized_relay_block_at_target }; // now let's check if we need to update parachain head at all let head_at_source = read_head_at_source(&source_client, metrics.as_ref(), &prove_at_relay_block).await?; let is_update_required = is_update_required::

( head_at_source, head_at_target, prove_at_relay_block, best_target_block, ); if is_update_required { let (head_proof, head_hash) = source_client.prove_parachain_head(prove_at_relay_block).await.map_err(|e| { log::warn!( target: "bridge", "Failed to prove {} parachain ParaId({}) heads: {:?}", P::SourceRelayChain::NAME, P::SourceParachain::PARACHAIN_ID, e, ); FailedClient::Source })?; log::info!( target: "bridge", "Submitting {} parachain ParaId({}) head update transaction to {}. Para hash at source relay {:?}: {:?}", P::SourceRelayChain::NAME, P::SourceParachain::PARACHAIN_ID, P::TargetChain::NAME, prove_at_relay_block, head_hash, ); let transaction_tracker = target_client .submit_parachain_head_proof( prove_at_relay_block, head_hash, head_proof, only_free_headers, ) .await .map_err(|e| { log::warn!( target: "bridge", "Failed to submit {} parachain ParaId({}) heads proof to {}: {:?}", P::SourceRelayChain::NAME, P::SourceParachain::PARACHAIN_ID, P::TargetChain::NAME, e, ); FailedClient::Target })?; submitted_heads_tracker = Some(SubmittedHeadsTracker::

::new(head_at_source, transaction_tracker)); } } } /// Returns `true` if we need to submit parachain-head-update transaction. fn is_update_required( head_at_source: AvailableHeader>, head_at_target: Option>, prove_at_relay_block: HeaderIdOf, best_target_block: HeaderIdOf, ) -> bool where P::SourceRelayChain: Chain, { log::trace!( target: "bridge", "Checking if {} parachain ParaId({}) needs update at {}:\n\t\ At {} ({:?}): {:?}\n\t\ At {} ({:?}): {:?}", P::SourceRelayChain::NAME, P::SourceParachain::PARACHAIN_ID, P::TargetChain::NAME, P::SourceRelayChain::NAME, prove_at_relay_block, head_at_source, P::TargetChain::NAME, best_target_block, head_at_target, ); let needs_update = match (head_at_source, head_at_target) { (AvailableHeader::Unavailable, _) => { // source client has politely asked us not to update current parachain head // at the target chain false }, (AvailableHeader::Available(head_at_source), Some(head_at_target)) if head_at_source.number() > head_at_target.number() => { // source client knows head that is better than the head known to the target // client true }, (AvailableHeader::Available(_), Some(_)) => { // this is normal case when relay has recently updated heads, when parachain is // not progressing, or when our source client is still syncing false }, (AvailableHeader::Available(_), None) => { // parachain is not yet known to the target client. This is true when parachain // or bridge has been just onboarded/started true }, (AvailableHeader::Missing, Some(_)) => { // parachain/parathread has been offboarded removed from the system. It needs to // be propageted to the target client true }, (AvailableHeader::Missing, None) => { // all's good - parachain is unknown to both clients false }, }; if needs_update { log::trace!( target: "bridge", "{} parachain ParaId({}) needs update at {}: {:?} vs {:?}", P::SourceRelayChain::NAME, P::SourceParachain::PARACHAIN_ID, P::TargetChain::NAME, head_at_source, head_at_target, ); } needs_update } /// Reads parachain head from the source client. async fn read_head_at_source( source_client: &impl SourceClient

, metrics: Option<&ParachainsLoopMetrics>, at_relay_block: &HeaderIdOf, ) -> Result>, FailedClient> { let para_head = source_client.parachain_head(*at_relay_block).await; match para_head { Ok(AvailableHeader::Available(para_head)) => { if let Some(metrics) = metrics { metrics.update_best_parachain_block_at_source( ParaId(P::SourceParachain::PARACHAIN_ID), para_head.number(), ); } Ok(AvailableHeader::Available(para_head)) }, Ok(r) => Ok(r), Err(e) => { log::warn!( target: "bridge", "Failed to read head of {} parachain ParaId({:?}): {:?}", P::SourceRelayChain::NAME, P::SourceParachain::PARACHAIN_ID, e, ); Err(FailedClient::Source) }, } } /// Reads parachain head from the target client. Also returns source relay chain header /// that has been used to prove that head. async fn read_head_at_target( target_client: &impl TargetClient

, metrics: Option<&ParachainsLoopMetrics>, at_block: &HeaderIdOf, ) -> Result< (Option>, Option>), FailedClient, > { let para_head_id = target_client.parachain_head(*at_block).await; match para_head_id { Ok(Some((relay_header_id, para_head_id))) => { if let Some(metrics) = metrics { metrics.update_best_parachain_block_at_target( ParaId(P::SourceParachain::PARACHAIN_ID), para_head_id.number(), ); } Ok((Some(relay_header_id), Some(para_head_id))) }, Ok(None) => Ok((None, None)), Err(e) => { log::warn!( target: "bridge", "Failed to read head of {} parachain ParaId({}) at {}: {:?}", P::SourceRelayChain::NAME, P::SourceParachain::PARACHAIN_ID, P::TargetChain::NAME, e, ); Err(FailedClient::Target) }, } } /// Submitted heads status. enum SubmittedHeadStatus { /// Heads are not yet updated. Waiting(SubmittedHeadsTracker

), /// Heads transaction has either been finalized or lost (i.e. received its "final" status). Final(TrackedTransactionStatus>), } /// Type of the transaction tracker that the `SubmittedHeadsTracker` is using. /// /// It needs to be shared because of `poll` macro and our consuming `update` method. type SharedTransactionTracker

= Shared< Pin< Box< dyn Future< Output = TrackedTransactionStatus< HeaderIdOf<

::TargetChain>, >, > + Send, >, >, >; /// Submitted parachain heads transaction. struct SubmittedHeadsTracker { /// Parachain header id that we have submitted. submitted_head: AvailableHeader>, /// Future that waits for submitted transaction finality or loss. /// /// It needs to be shared because of `poll` macro and our consuming `update` method. transaction_tracker: SharedTransactionTracker

, } impl SubmittedHeadsTracker

{ /// Creates new parachain heads transaction tracker. pub fn new( submitted_head: AvailableHeader>, transaction_tracker: impl TransactionTracker> + 'static, ) -> Self { SubmittedHeadsTracker { submitted_head, transaction_tracker: transaction_tracker.wait().fuse().boxed().shared(), } } /// Returns `None` if all submitted parachain heads have been updated. pub async fn update( self, at_target_block: &HeaderIdOf, head_at_target: &Option>, ) -> SubmittedHeadStatus

{ // check if our head has been updated let is_head_updated = match (self.submitted_head, head_at_target) { (AvailableHeader::Available(submitted_head), Some(head_at_target)) if head_at_target.number() >= submitted_head.number() => true, (AvailableHeader::Missing, None) => true, _ => false, }; if is_head_updated { log::trace!( target: "bridge", "Head of parachain ParaId({}) has been updated at {}: {:?}", P::SourceParachain::PARACHAIN_ID, P::TargetChain::NAME, head_at_target, ); return SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(*at_target_block)) } // if underlying transaction tracker has reported that the transaction is lost, we may // then restart our sync let transaction_tracker = self.transaction_tracker.clone(); match poll!(transaction_tracker) { Poll::Ready(TrackedTransactionStatus::Lost) => return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost), Poll::Ready(TrackedTransactionStatus::Finalized(_)) => { // so we are here and our transaction is mined+finalized, but some of heads were not // updated => we're considering our loop as stalled return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost) }, _ => (), } SubmittedHeadStatus::Waiting(self) } } #[cfg(test)] mod tests { use super::*; use async_std::sync::{Arc, Mutex}; use codec::Encode; use futures::{SinkExt, StreamExt}; use relay_substrate_client::test_chain::{TestChain, TestParachain}; use relay_utils::{HeaderId, MaybeConnectionError}; use sp_core::H256; use std::collections::HashMap; const PARA_10_HASH: ParaHash = H256([10u8; 32]); const PARA_20_HASH: ParaHash = H256([20u8; 32]); #[derive(Clone, Debug)] enum TestError { Error, } impl MaybeConnectionError for TestError { fn is_connection_error(&self) -> bool { false } } #[derive(Clone, Debug, PartialEq, Eq)] struct TestParachainsPipeline; impl ParachainsPipeline for TestParachainsPipeline { type SourceRelayChain = TestChain; type SourceParachain = TestParachain; type TargetChain = TestChain; } #[derive(Clone, Debug)] struct TestClient { data: Arc>, } #[derive(Clone, Debug)] struct TestTransactionTracker(Option>>); #[async_trait] impl TransactionTracker for TestTransactionTracker { type HeaderId = HeaderIdOf; async fn wait(self) -> TrackedTransactionStatus> { match self.0 { Some(status) => status, None => futures::future::pending().await, } } } #[derive(Clone, Debug)] struct TestClientData { source_sync_status: Result, source_head: HashMap< BlockNumberOf, Result>, TestError>, >, source_proof: Result<(), TestError>, target_free_source_relay_headers_interval: Result>, TestError>, target_best_block: Result, TestError>, target_best_finalized_source_block: Result, TestError>, #[allow(clippy::type_complexity)] target_head: Result, HeaderIdOf)>, TestError>, target_submit_result: Result<(), TestError>, submitted_proof_at_source_relay_block: Option>, exit_signal_sender: Option>>, } impl TestClientData { pub fn minimal() -> Self { TestClientData { source_sync_status: Ok(true), source_head: vec![(0, Ok(AvailableHeader::Available(HeaderId(0, PARA_20_HASH))))] .into_iter() .collect(), source_proof: Ok(()), target_free_source_relay_headers_interval: Ok(None), target_best_block: Ok(HeaderId(0, Default::default())), target_best_finalized_source_block: Ok(HeaderId(0, Default::default())), target_head: Ok(None), target_submit_result: Ok(()), submitted_proof_at_source_relay_block: None, exit_signal_sender: None, } } pub fn with_exit_signal_sender( sender: futures::channel::mpsc::UnboundedSender<()>, ) -> Self { let mut client = Self::minimal(); client.exit_signal_sender = Some(Box::new(sender)); client } } impl From for TestClient { fn from(data: TestClientData) -> TestClient { TestClient { data: Arc::new(Mutex::new(data)) } } } #[async_trait] impl RelayClient for TestClient { type Error = TestError; async fn reconnect(&mut self) -> Result<(), TestError> { unimplemented!() } } #[async_trait] impl SourceClient for TestClient { async fn ensure_synced(&self) -> Result { self.data.lock().await.source_sync_status.clone() } async fn parachain_head( &self, at_block: HeaderIdOf, ) -> Result>, TestError> { self.data .lock() .await .source_head .get(&at_block.0) .expect(&format!("SourceClient::parachain_head({})", at_block.0)) .clone() } async fn prove_parachain_head( &self, at_block: HeaderIdOf, ) -> Result<(ParaHeadsProof, ParaHash), TestError> { let head_result = SourceClient::::parachain_head(self, at_block).await?; let head = head_result.as_available().unwrap(); let storage_proof = vec![head.hash().encode()]; let proof = (ParaHeadsProof { storage_proof }, head.hash()); self.data.lock().await.source_proof.clone().map(|_| proof) } } #[async_trait] impl TargetClient for TestClient { type TransactionTracker = TestTransactionTracker; async fn best_block(&self) -> Result, TestError> { self.data.lock().await.target_best_block.clone() } async fn best_finalized_source_relay_chain_block( &self, _at_block: &HeaderIdOf, ) -> Result, TestError> { self.data.lock().await.target_best_finalized_source_block.clone() } async fn free_source_relay_headers_interval( &self, ) -> Result>, TestError> { self.data.lock().await.target_free_source_relay_headers_interval.clone() } async fn parachain_head( &self, _at_block: HeaderIdOf, ) -> Result, HeaderIdOf)>, TestError> { self.data.lock().await.target_head.clone() } async fn submit_parachain_head_proof( &self, at_source_block: HeaderIdOf, _updated_parachain_head: ParaHash, _proof: ParaHeadsProof, _is_free_execution_expected: bool, ) -> Result { let mut data = self.data.lock().await; data.target_submit_result.clone()?; data.submitted_proof_at_source_relay_block = Some(at_source_block); if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() { exit_signal_sender.send(()).await.unwrap(); } Ok(TestTransactionTracker(Some( TrackedTransactionStatus::Finalized(Default::default()), ))) } } #[test] fn when_source_client_fails_to_return_sync_state() { let mut test_source_client = TestClientData::minimal(); test_source_client.source_sync_status = Err(TestError::Error); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(test_source_client), TestClient::from(TestClientData::minimal()), None, false, futures::future::pending(), )), Err(FailedClient::Source), ); } #[test] fn when_target_client_fails_to_return_best_block() { let mut test_target_client = TestClientData::minimal(); test_target_client.target_best_block = Err(TestError::Error); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(TestClientData::minimal()), TestClient::from(test_target_client), None, false, futures::future::pending(), )), Err(FailedClient::Target), ); } #[test] fn when_target_client_fails_to_read_heads() { let mut test_target_client = TestClientData::minimal(); test_target_client.target_head = Err(TestError::Error); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(TestClientData::minimal()), TestClient::from(test_target_client), None, false, futures::future::pending(), )), Err(FailedClient::Target), ); } #[test] fn when_target_client_fails_to_read_best_finalized_source_block() { let mut test_target_client = TestClientData::minimal(); test_target_client.target_best_finalized_source_block = Err(TestError::Error); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(TestClientData::minimal()), TestClient::from(test_target_client), None, false, futures::future::pending(), )), Err(FailedClient::Target), ); } #[test] fn when_source_client_fails_to_read_heads() { let mut test_source_client = TestClientData::minimal(); test_source_client.source_head.insert(0, Err(TestError::Error)); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(test_source_client), TestClient::from(TestClientData::minimal()), None, false, futures::future::pending(), )), Err(FailedClient::Source), ); } #[test] fn when_source_client_fails_to_prove_heads() { let mut test_source_client = TestClientData::minimal(); test_source_client.source_proof = Err(TestError::Error); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(test_source_client), TestClient::from(TestClientData::minimal()), None, false, futures::future::pending(), )), Err(FailedClient::Source), ); } #[test] fn when_target_client_rejects_update_transaction() { let mut test_target_client = TestClientData::minimal(); test_target_client.target_submit_result = Err(TestError::Error); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(TestClientData::minimal()), TestClient::from(test_target_client), None, false, futures::future::pending(), )), Err(FailedClient::Target), ); } #[test] fn minimal_working_case() { let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded(); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(TestClientData::minimal()), TestClient::from(TestClientData::with_exit_signal_sender(exit_signal_sender)), None, false, exit_signal.into_future().map(|(_, _)| ()), )), Ok(()), ); } #[async_std::test] async fn free_headers_are_relayed() { // prepare following case: // 1) best source relay at target: 95 // 2) best source parachain at target: 5 at relay 50 // 3) free headers interval: 10 // 4) at source relay chain block 90 source parachain block is 9 // + // 5) best finalized source relay chain block is 95 // 6) at source relay chain block 95 source parachain block is 42 // => // parachain block 42 would have been relayed, because 95 - 50 > 10 let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded(); let clients_data = TestClientData { source_sync_status: Ok(true), source_head: vec![ (90, Ok(AvailableHeader::Available(HeaderId(9, [9u8; 32].into())))), (95, Ok(AvailableHeader::Available(HeaderId(42, [42u8; 32].into())))), ] .into_iter() .collect(), source_proof: Ok(()), target_free_source_relay_headers_interval: Ok(Some(10)), target_best_block: Ok(HeaderId(200, [200u8; 32].into())), target_best_finalized_source_block: Ok(HeaderId(95, [95u8; 32].into())), target_head: Ok(Some((HeaderId(50, [50u8; 32].into()), HeaderId(5, [5u8; 32].into())))), target_submit_result: Ok(()), submitted_proof_at_source_relay_block: None, exit_signal_sender: Some(Box::new(exit_signal_sender)), }; let source_client = TestClient::from(clients_data.clone()); let target_client = TestClient::from(clients_data); assert_eq!( run_until_connection_lost( source_client, target_client.clone(), None, true, exit_signal.into_future().map(|(_, _)| ()), ) .await, Ok(()), ); assert_eq!( target_client .data .lock() .await .submitted_proof_at_source_relay_block .map(|id| id.0), Some(95) ); // now source relay block chain 104 is mined with parachain head #84 // => since 104 - 95 < 10, there are no free headers // => nothing is submitted let mut clients_data: TestClientData = target_client.data.lock().await.clone(); clients_data .source_head .insert(104, Ok(AvailableHeader::Available(HeaderId(84, [84u8; 32].into())))); clients_data.target_best_finalized_source_block = Ok(HeaderId(104, [104u8; 32].into())); clients_data.target_head = Ok(Some((HeaderId(95, [95u8; 32].into()), HeaderId(42, [42u8; 32].into())))); clients_data.target_best_block = Ok(HeaderId(255, [255u8; 32].into())); clients_data.exit_signal_sender = None; let source_client = TestClient::from(clients_data.clone()); let target_client = TestClient::from(clients_data); assert_eq!( run_until_connection_lost( source_client, target_client.clone(), None, true, async_std::task::sleep(std::time::Duration::from_millis(100)), ) .await, Ok(()), ); assert_eq!( target_client .data .lock() .await .submitted_proof_at_source_relay_block .map(|id| id.0), Some(95) ); } fn test_tx_tracker() -> SubmittedHeadsTracker { SubmittedHeadsTracker::new( AvailableHeader::Available(HeaderId(20, PARA_20_HASH)), TestTransactionTracker(None), ) } impl From> for Option<()> { fn from(status: SubmittedHeadStatus) -> Option<()> { match status { SubmittedHeadStatus::Waiting(_) => Some(()), _ => None, } } } #[async_std::test] async fn tx_tracker_update_when_head_at_target_has_none_value() { assert_eq!( Some(()), test_tx_tracker() .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH))) .await .into(), ); } #[async_std::test] async fn tx_tracker_update_when_head_at_target_has_old_value() { assert_eq!( Some(()), test_tx_tracker() .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH))) .await .into(), ); } #[async_std::test] async fn tx_tracker_update_when_head_at_target_has_same_value() { assert!(matches!( test_tx_tracker() .update(&HeaderId(0, Default::default()), &Some(HeaderId(20, PARA_20_HASH))) .await, SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)), )); } #[async_std::test] async fn tx_tracker_update_when_head_at_target_has_better_value() { assert!(matches!( test_tx_tracker() .update(&HeaderId(0, Default::default()), &Some(HeaderId(30, PARA_20_HASH))) .await, SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)), )); } #[async_std::test] async fn tx_tracker_update_when_tx_is_lost() { let mut tx_tracker = test_tx_tracker(); tx_tracker.transaction_tracker = futures::future::ready(TrackedTransactionStatus::Lost).boxed().shared(); assert!(matches!( tx_tracker .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH))) .await, SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost), )); } #[async_std::test] async fn tx_tracker_update_when_tx_is_finalized_but_heads_are_not_updated() { let mut tx_tracker = test_tx_tracker(); tx_tracker.transaction_tracker = futures::future::ready(TrackedTransactionStatus::Finalized(Default::default())) .boxed() .shared(); assert!(matches!( tx_tracker .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH))) .await, SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost), )); } #[test] fn parachain_is_not_updated_if_it_is_unavailable() { assert!(!is_update_required::( AvailableHeader::Unavailable, None, Default::default(), Default::default(), )); assert!(!is_update_required::( AvailableHeader::Unavailable, Some(HeaderId(10, PARA_10_HASH)), Default::default(), Default::default(), )); } #[test] fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() { assert!(!is_update_required::( AvailableHeader::Missing, None, Default::default(), Default::default(), ),); } #[test] fn parachain_is_not_updated_if_target_has_better_head() { assert!(!is_update_required::( AvailableHeader::Available(HeaderId(10, Default::default())), Some(HeaderId(20, Default::default())), Default::default(), Default::default(), ),); } #[test] fn parachain_is_updated_after_offboarding() { assert!(is_update_required::( AvailableHeader::Missing, Some(HeaderId(20, Default::default())), Default::default(), Default::default(), ),); } #[test] fn parachain_is_updated_after_onboarding() { assert!(is_update_required::( AvailableHeader::Available(HeaderId(30, Default::default())), None, Default::default(), Default::default(), ),); } #[test] fn parachain_is_updated_if_newer_head_is_known() { assert!(is_update_required::( AvailableHeader::Available(HeaderId(40, Default::default())), Some(HeaderId(30, Default::default())), Default::default(), Default::default(), ),); } }