Newer
Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline};
use async_trait::async_trait;
use bp_polkadot_core::{
parachains::{ParaHash, ParaHeadsProof, ParaId},
BlockNumber as RelayBlockNumber,
};
use futures::{
future::{FutureExt, Shared},
use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf, ParachainBase};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient,
TrackedTransactionStatus, TransactionTracker,
};
Svyatoslav Nikolsky
committed
use std::{future::Future, pin::Pin, task::Poll};
/// Parachain header availability at a certain chain.
Svyatoslav Nikolsky
committed
#[derive(Clone, Copy, Debug)]
Svyatoslav Nikolsky
committed
/// The client can not report actual parachain head at this moment.
Svyatoslav Nikolsky
committed
///
/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
/// This variant must be treated as "we don't want to update parachain head value at the
/// target chain at this moment".
Unavailable,
/// There's no parachain header at the relay chain.
///
/// Normally it means that the parachain is not registered there.
Missing,
/// Parachain head with given hash is available at the source chain.
Available(T),
Svyatoslav Nikolsky
committed
}
Svyatoslav Nikolsky
committed
/// Return available header.
pub fn as_available(&self) -> Option<&T> {
match *self {
AvailableHeader::Available(ref header) => Some(header),
_ => None,
}
}
}
impl<T> From<Option<T>> for AvailableHeader<T> {
fn from(maybe_header: Option<T>) -> AvailableHeader<T> {
match maybe_header {
Some(header) => AvailableHeader::Available(header),
None => AvailableHeader::Missing,
Svyatoslav Nikolsky
committed
}
}
}
/// Source client used in parachain heads synchronization loop.
#[async_trait]
pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
/// Returns `Ok(true)` if client is in synced state.
async fn ensure_synced(&self) -> Result<bool, Self::Error>;
Svyatoslav Nikolsky
committed
/// Get parachain head id at given block.
async fn parachain_head(
&self,
Svyatoslav Nikolsky
committed
at_block: HeaderIdOf<P::SourceRelayChain>,
) -> Result<AvailableHeader<HeaderIdOf<P::SourceParachain>>, Self::Error>;
Svyatoslav Nikolsky
committed
/// Get parachain head proof at given block.
async fn prove_parachain_head(
Svyatoslav Nikolsky
committed
at_block: HeaderIdOf<P::SourceRelayChain>,
) -> Result<(ParaHeadsProof, ParaHash), Self::Error>;
}
/// Target client used in parachain heads synchronization loop.
#[async_trait]
pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>>;
/// Get best block id.
async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;
/// Get best finalized source relay chain block id. If `free_source_relay_headers_interval`
/// is `Some(_)`, the returned
Svyatoslav Nikolsky
committed
async fn best_finalized_source_relay_chain_block(
&self,
at_block: &HeaderIdOf<P::TargetChain>,
Svyatoslav Nikolsky
committed
) -> Result<HeaderIdOf<P::SourceRelayChain>, Self::Error>;
/// Get free source **relay** headers submission interval, if it is configured in the
/// target runtime. We assume that the target chain will accept parachain header, proved
/// at such relay header for free.
async fn free_source_relay_headers_interval(
&self,
) -> Result<Option<BlockNumberOf<P::SourceRelayChain>>, Self::Error>;
Svyatoslav Nikolsky
committed
/// Get parachain head id at given block.
async fn parachain_head(
&self,
at_block: HeaderIdOf<P::TargetChain>,
) -> Result<
Option<(HeaderIdOf<P::SourceRelayChain>, HeaderIdOf<P::SourceParachain>)>,
Self::Error,
>;
/// Submit parachain heads proof.
Svyatoslav Nikolsky
committed
async fn submit_parachain_head_proof(
Svyatoslav Nikolsky
committed
at_source_block: HeaderIdOf<P::SourceRelayChain>,
para_head_hash: ParaHash,
is_free_execution_expected: bool,
) -> Result<Self::TransactionTracker, Self::Error>;
}
/// Return prefix that will be used by default to expose Prometheus metrics of the parachains
/// sync loop.
pub fn metrics_prefix<P: ParachainsPipeline>() -> String {
Svyatoslav Nikolsky
committed
format!(
"{}_to_{}_Parachains_{}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
P::SourceParachain::PARACHAIN_ID
)
}
/// Run parachain heads synchronization.
pub async fn run<P: ParachainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: MetricsParams,
only_free_headers: bool,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), relay_utils::Error>
where
Svyatoslav Nikolsky
committed
P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
{
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_params)
.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
.expose()
.await?
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
target_client,
metrics,
only_free_headers,
exit_signal.clone(),
)
})
.await
}
/// Run parachain heads synchronization.
async fn run_until_connection_lost<P: ParachainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics: Option<ParachainsLoopMetrics>,
only_free_headers: bool,
exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient>
where
Svyatoslav Nikolsky
committed
P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
{
let exit_signal = exit_signal.fuse();
let min_block_interval = std::cmp::min(
Svyatoslav Nikolsky
committed
P::SourceRelayChain::AVERAGE_BLOCK_INTERVAL,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
);
// free parachain header = header, available (proved) at free relay chain block. Let's
// read interval of free source relay chain blocks from target client
let free_source_relay_headers_interval = if only_free_headers {
let free_source_relay_headers_interval =
target_client.free_source_relay_headers_interval().await.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to read free {} headers interval at {}: {:?}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
e,
);
FailedClient::Target
})?;
match free_source_relay_headers_interval {
Some(free_source_relay_headers_interval) if free_source_relay_headers_interval != 0 => {
log::trace!(
target: "bridge",
"Free {} headers interval at {}: {:?}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
free_source_relay_headers_interval,
);
free_source_relay_headers_interval
},
_ => {
log::warn!(
target: "bridge",
"Invalid free {} headers interval at {}: {:?}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
free_source_relay_headers_interval,
);
return Err(FailedClient::Target)
},
}
} else {
// ignore - we don't need it
0
};
let mut submitted_heads_tracker: Option<SubmittedHeadsTracker<P>> = None;
futures::pin_mut!(exit_signal);
// Note that the internal loop breaks with `FailedClient` error even if error is non-connection.
// It is Ok for now, but it may need to be fixed in the future to use exponential backoff for
// regular errors.
loop {
// Either wait for new block, or exit signal.
// Please note that we are prioritizing the exit signal since if both events happen at once
// it doesn't make sense to perform one more loop iteration.
select_biased! {
_ = exit_signal => return Ok(()),
_ = async_std::task::sleep(min_block_interval).fuse() => {},
}
// if source client is not yet synced, we'll need to sleep. Otherwise we risk submitting too
// much redundant transactions
match source_client.ensure_synced().await {
Ok(true) => (),
Ok(false) => {
log::warn!(
target: "bridge",
"{} client is syncing. Won't do anything until it is synced",
Svyatoslav Nikolsky
committed
P::SourceRelayChain::NAME,
);
continue
},
Err(e) => {
log::warn!(
target: "bridge",
"{} client has failed to return its sync status: {:?}",
Svyatoslav Nikolsky
committed
P::SourceRelayChain::NAME,
},
}
// if we have active transaction, we'll need to wait until it is mined or dropped
let best_target_block = target_client.best_block().await.map_err(|e| {
Svyatoslav Nikolsky
committed
log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceRelayChain::NAME, e);
let (relay_of_head_at_target, head_at_target) =
Svyatoslav Nikolsky
committed
read_head_at_target(&target_client, metrics.as_ref(), &best_target_block).await?;
// check if our transaction has been mined
if let Some(tracker) = submitted_heads_tracker.take() {
Svyatoslav Nikolsky
committed
match tracker.update(&best_target_block, &head_at_target).await {
SubmittedHeadStatus::Waiting(tracker) => {
// no news about our transaction and we shall keep waiting
submitted_heads_tracker = Some(tracker);
continue
},
Svyatoslav Nikolsky
committed
SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)) => {
// all heads have been updated, we don't need this tracker anymore
},
Svyatoslav Nikolsky
committed
SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost) => {
log::warn!(
target: "bridge",
"Parachains synchronization from {} to {} has stalled. Going to restart",
Svyatoslav Nikolsky
committed
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
);
return Err(FailedClient::Both)
},
}
// in all-headers strategy we'll be submitting para head, available at
// `best_finalized_relay_block_at_target`
let best_finalized_relay_block_at_target = target_client
Svyatoslav Nikolsky
committed
.best_finalized_source_relay_chain_block(&best_target_block)
.await
.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to read best finalized {} block from {}: {:?}",
Svyatoslav Nikolsky
committed
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
e,
);
FailedClient::Target
})?;
// ..but if we only need to submit free headers, we need to submit para
// head, available at best free source relay chain header, known to the
// target chain
let prove_at_relay_block = if only_free_headers {
match relay_of_head_at_target {
Some(relay_of_head_at_target) => {
// find last free relay chain header in the range that we are interested in
let scan_range_begin = relay_of_head_at_target.number();
let scan_range_end = best_finalized_relay_block_at_target.number();
if scan_range_end.saturating_sub(scan_range_begin) <
free_source_relay_headers_interval
{
// there are no new **free** relay chain headers in the range
log::trace!(
target: "bridge",
"Waiting for new free {} headers at {}: scanned {:?}..={:?}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
scan_range_begin,
scan_range_end,
);
continue;
}
// we may submit new parachain head for free
best_finalized_relay_block_at_target
None => {
// no parachain head at target => let's submit first one
best_finalized_relay_block_at_target
},
}
} else {
best_finalized_relay_block_at_target
};
// now let's check if we need to update parachain head at all
Svyatoslav Nikolsky
committed
let head_at_source =
read_head_at_source(&source_client, metrics.as_ref(), &prove_at_relay_block).await?;
let is_update_required = is_update_required::<P>(
head_at_source,
head_at_target,
prove_at_relay_block,
best_target_block,
);
let (head_proof, head_hash) =
source_client.prove_parachain_head(prove_at_relay_block).await.map_err(|e| {
log::warn!(
target: "bridge",
Svyatoslav Nikolsky
committed
"Failed to prove {} parachain ParaId({}) heads: {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
e,
);
FailedClient::Source
})?;
log::info!(
target: "bridge",
"Submitting {} parachain ParaId({}) head update transaction to {}. Para hash at source relay {:?}: {:?}",
Svyatoslav Nikolsky
committed
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
prove_at_relay_block,
head_hash,
Svyatoslav Nikolsky
committed
let transaction_tracker = target_client
.submit_parachain_head_proof(
prove_at_relay_block,
head_hash,
head_proof,
only_free_headers,
)
.await
.map_err(|e| {
log::warn!(
target: "bridge",
Svyatoslav Nikolsky
committed
"Failed to submit {} parachain ParaId({}) heads proof to {}: {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
e,
);
FailedClient::Target
})?;
Svyatoslav Nikolsky
committed
submitted_heads_tracker =
Some(SubmittedHeadsTracker::<P>::new(head_at_source, transaction_tracker));
Svyatoslav Nikolsky
committed
/// Returns `true` if we need to submit parachain-head-update transaction.
fn is_update_required<P: ParachainsPipeline>(
head_at_source: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
head_at_target: Option<HeaderIdOf<P::SourceParachain>>,
prove_at_relay_block: HeaderIdOf<P::SourceRelayChain>,
best_target_block: HeaderIdOf<P::TargetChain>,
Svyatoslav Nikolsky
committed
) -> bool
Svyatoslav Nikolsky
committed
P::SourceRelayChain: Chain<BlockNumber = RelayBlockNumber>,
log::trace!(
target: "bridge",
Svyatoslav Nikolsky
committed
"Checking if {} parachain ParaId({}) needs update at {}:\n\t\
At {} ({:?}): {:?}\n\t\
At {} ({:?}): {:?}",
Svyatoslav Nikolsky
committed
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
Svyatoslav Nikolsky
committed
P::SourceRelayChain::NAME,
prove_at_relay_block,
Svyatoslav Nikolsky
committed
head_at_source,
P::TargetChain::NAME,
best_target_block,
Svyatoslav Nikolsky
committed
head_at_target,
Svyatoslav Nikolsky
committed
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
let needs_update = match (head_at_source, head_at_target) {
(AvailableHeader::Unavailable, _) => {
// source client has politely asked us not to update current parachain head
// at the target chain
false
},
(AvailableHeader::Available(head_at_source), Some(head_at_target))
if head_at_source.number() > head_at_target.number() =>
{
// source client knows head that is better than the head known to the target
// client
true
},
(AvailableHeader::Available(_), Some(_)) => {
// this is normal case when relay has recently updated heads, when parachain is
// not progressing, or when our source client is still syncing
false
},
(AvailableHeader::Available(_), None) => {
// parachain is not yet known to the target client. This is true when parachain
// or bridge has been just onboarded/started
true
},
(AvailableHeader::Missing, Some(_)) => {
// parachain/parathread has been offboarded removed from the system. It needs to
// be propageted to the target client
true
},
(AvailableHeader::Missing, None) => {
// all's good - parachain is unknown to both clients
false
},
};
Svyatoslav Nikolsky
committed
if needs_update {
log::trace!(
target: "bridge",
"{} parachain ParaId({}) needs update at {}: {:?} vs {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
head_at_source,
head_at_target,
);
Svyatoslav Nikolsky
committed
needs_update
Svyatoslav Nikolsky
committed
/// Reads parachain head from the source client.
async fn read_head_at_source<P: ParachainsPipeline>(
source_client: &impl SourceClient<P>,
metrics: Option<&ParachainsLoopMetrics>,
Svyatoslav Nikolsky
committed
at_relay_block: &HeaderIdOf<P::SourceRelayChain>,
) -> Result<AvailableHeader<HeaderIdOf<P::SourceParachain>>, FailedClient> {
let para_head = source_client.parachain_head(*at_relay_block).await;
match para_head {
Ok(AvailableHeader::Available(para_head)) => {
if let Some(metrics) = metrics {
metrics.update_best_parachain_block_at_source(
ParaId(P::SourceParachain::PARACHAIN_ID),
para_head.number(),
Svyatoslav Nikolsky
committed
}
Ok(AvailableHeader::Available(para_head))
},
Ok(r) => Ok(r),
Err(e) => {
log::warn!(
target: "bridge",
"Failed to read head of {} parachain ParaId({:?}): {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
e,
);
Err(FailedClient::Source)
},
/// Reads parachain head from the target client. Also returns source relay chain header
/// that has been used to prove that head.
Svyatoslav Nikolsky
committed
async fn read_head_at_target<P: ParachainsPipeline>(
target_client: &impl TargetClient<P>,
metrics: Option<&ParachainsLoopMetrics>,
at_block: &HeaderIdOf<P::TargetChain>,
) -> Result<
(Option<HeaderIdOf<P::SourceRelayChain>>, Option<HeaderIdOf<P::SourceParachain>>),
FailedClient,
> {
Svyatoslav Nikolsky
committed
let para_head_id = target_client.parachain_head(*at_block).await;
match para_head_id {
Ok(Some((relay_header_id, para_head_id))) => {
Svyatoslav Nikolsky
committed
if let Some(metrics) = metrics {
metrics.update_best_parachain_block_at_target(
ParaId(P::SourceParachain::PARACHAIN_ID),
para_head_id.number(),
Svyatoslav Nikolsky
committed
}
Ok((Some(relay_header_id), Some(para_head_id)))
Svyatoslav Nikolsky
committed
},
Ok(None) => Ok((None, None)),
Svyatoslav Nikolsky
committed
Err(e) => {
log::warn!(
target: "bridge",
"Failed to read head of {} parachain ParaId({}) at {}: {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
e,
);
Err(FailedClient::Target)
},
/// Submitted heads status.
Svyatoslav Nikolsky
committed
enum SubmittedHeadStatus<P: ParachainsPipeline> {
/// Heads are not yet updated.
Waiting(SubmittedHeadsTracker<P>),
/// Heads transaction has either been finalized or lost (i.e. received its "final" status).
Final(TrackedTransactionStatus<HeaderIdOf<P::TargetChain>>),
/// Type of the transaction tracker that the `SubmittedHeadsTracker` is using.
///
/// It needs to be shared because of `poll` macro and our consuming `update` method.
type SharedTransactionTracker<P> = Shared<
Pin<
Box<
dyn Future<
Output = TrackedTransactionStatus<
HeaderIdOf<<P as ParachainsPipeline>::TargetChain>,
>,
> + Send,
>,
>,
>;
/// Submitted parachain heads transaction.
struct SubmittedHeadsTracker<P: ParachainsPipeline> {
Svyatoslav Nikolsky
committed
/// Parachain header id that we have submitted.
submitted_head: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
/// Future that waits for submitted transaction finality or loss.
///
/// It needs to be shared because of `poll` macro and our consuming `update` method.
transaction_tracker: SharedTransactionTracker<P>,
Svyatoslav Nikolsky
committed
impl<P: ParachainsPipeline> SubmittedHeadsTracker<P> {
/// Creates new parachain heads transaction tracker.
pub fn new(
Svyatoslav Nikolsky
committed
submitted_head: AvailableHeader<HeaderIdOf<P::SourceParachain>>,
transaction_tracker: impl TransactionTracker<HeaderId = HeaderIdOf<P::TargetChain>> + 'static,
SubmittedHeadsTracker {
Svyatoslav Nikolsky
committed
submitted_head,
transaction_tracker: transaction_tracker.wait().fuse().boxed().shared(),
/// Returns `None` if all submitted parachain heads have been updated.
pub async fn update(
Svyatoslav Nikolsky
committed
self,
at_target_block: &HeaderIdOf<P::TargetChain>,
Svyatoslav Nikolsky
committed
head_at_target: &Option<HeaderIdOf<P::SourceParachain>>,
) -> SubmittedHeadStatus<P> {
// check if our head has been updated
let is_head_updated = match (self.submitted_head, head_at_target) {
(AvailableHeader::Available(submitted_head), Some(head_at_target))
if head_at_target.number() >= submitted_head.number() =>
true,
(AvailableHeader::Missing, None) => true,
_ => false,
};
if is_head_updated {
log::trace!(
target: "bridge",
"Head of parachain ParaId({}) has been updated at {}: {:?}",
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
head_at_target,
);
Svyatoslav Nikolsky
committed
return SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(*at_target_block))
// if underlying transaction tracker has reported that the transaction is lost, we may
// then restart our sync
let transaction_tracker = self.transaction_tracker.clone();
match poll!(transaction_tracker) {
Poll::Ready(TrackedTransactionStatus::Lost) =>
Svyatoslav Nikolsky
committed
return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
Poll::Ready(TrackedTransactionStatus::Finalized(_)) => {
// so we are here and our transaction is mined+finalized, but some of heads were not
// updated => we're considering our loop as stalled
Svyatoslav Nikolsky
committed
return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost)
},
_ => (),
Svyatoslav Nikolsky
committed
SubmittedHeadStatus::Waiting(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_std::sync::{Arc, Mutex};
use codec::Encode;
use futures::{SinkExt, StreamExt};
Svyatoslav Nikolsky
committed
use relay_substrate_client::test_chain::{TestChain, TestParachain};
use relay_utils::{HeaderId, MaybeConnectionError};
use sp_core::H256;
use std::collections::HashMap;
Svyatoslav Nikolsky
committed
const PARA_10_HASH: ParaHash = H256([10u8; 32]);
const PARA_20_HASH: ParaHash = H256([20u8; 32]);
#[derive(Clone, Debug)]
enum TestError {
Error,
}
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
false
}
}
Svyatoslav Nikolsky
committed
#[derive(Clone, Debug, PartialEq, Eq)]
struct TestParachainsPipeline;
impl ParachainsPipeline for TestParachainsPipeline {
Svyatoslav Nikolsky
committed
type SourceRelayChain = TestChain;
type SourceParachain = TestParachain;
type TargetChain = TestChain;
}
#[derive(Clone, Debug)]
struct TestClient {
data: Arc<Mutex<TestClientData>>,
}
#[derive(Clone, Debug)]
struct TestTransactionTracker(Option<TrackedTransactionStatus<HeaderIdOf<TestChain>>>);
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
type HeaderId = HeaderIdOf<TestChain>;
async fn wait(self) -> TrackedTransactionStatus<HeaderIdOf<TestChain>> {
match self.0 {
Some(status) => status,
None => futures::future::pending().await,
}
#[derive(Clone, Debug)]
struct TestClientData {
source_sync_status: Result<bool, TestError>,
source_head: HashMap<
BlockNumberOf<TestChain>,
Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError>,
>,
Svyatoslav Nikolsky
committed
source_proof: Result<(), TestError>,
target_free_source_relay_headers_interval:
Result<Option<BlockNumberOf<TestChain>>, TestError>,
target_best_block: Result<HeaderIdOf<TestChain>, TestError>,
target_best_finalized_source_block: Result<HeaderIdOf<TestChain>, TestError>,
target_head: Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestParachain>)>, TestError>,
target_submit_result: Result<(), TestError>,
submitted_proof_at_source_relay_block: Option<HeaderIdOf<TestChain>>,
exit_signal_sender: Option<Box<futures::channel::mpsc::UnboundedSender<()>>>,
}
impl TestClientData {
pub fn minimal() -> Self {
TestClientData {
source_sync_status: Ok(true),
source_head: vec![(0, Ok(AvailableHeader::Available(HeaderId(0, PARA_20_HASH))))]
.into_iter()
.collect(),
Svyatoslav Nikolsky
committed
source_proof: Ok(()),
target_free_source_relay_headers_interval: Ok(None),
target_best_block: Ok(HeaderId(0, Default::default())),
target_best_finalized_source_block: Ok(HeaderId(0, Default::default())),
Svyatoslav Nikolsky
committed
target_head: Ok(None),
target_submit_result: Ok(()),
submitted_proof_at_source_relay_block: None,
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
exit_signal_sender: None,
}
}
pub fn with_exit_signal_sender(
sender: futures::channel::mpsc::UnboundedSender<()>,
) -> Self {
let mut client = Self::minimal();
client.exit_signal_sender = Some(Box::new(sender));
client
}
}
impl From<TestClientData> for TestClient {
fn from(data: TestClientData) -> TestClient {
TestClient { data: Arc::new(Mutex::new(data)) }
}
}
#[async_trait]
impl RelayClient for TestClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unimplemented!()
}
}
#[async_trait]
impl SourceClient<TestParachainsPipeline> for TestClient {
async fn ensure_synced(&self) -> Result<bool, TestError> {
self.data.lock().await.source_sync_status.clone()
}
async fn parachain_head(
&self,
at_block: HeaderIdOf<TestChain>,
Svyatoslav Nikolsky
committed
) -> Result<AvailableHeader<HeaderIdOf<TestParachain>>, TestError> {
self.data
.lock()
.await
.source_head
.get(&at_block.0)
.expect(&format!("SourceClient::parachain_head({})", at_block.0))
.clone()
Svyatoslav Nikolsky
committed
async fn prove_parachain_head(
at_block: HeaderIdOf<TestChain>,
Svyatoslav Nikolsky
committed
) -> Result<(ParaHeadsProof, ParaHash), TestError> {
let head_result =
SourceClient::<TestParachainsPipeline>::parachain_head(self, at_block).await?;
let head = head_result.as_available().unwrap();
let storage_proof = vec![head.hash().encode()];
let proof = (ParaHeadsProof { storage_proof }, head.hash());
Svyatoslav Nikolsky
committed
self.data.lock().await.source_proof.clone().map(|_| proof)
}
}
#[async_trait]
impl TargetClient<TestParachainsPipeline> for TestClient {
type TransactionTracker = TestTransactionTracker;
async fn best_block(&self) -> Result<HeaderIdOf<TestChain>, TestError> {
self.data.lock().await.target_best_block.clone()
}
Svyatoslav Nikolsky
committed
async fn best_finalized_source_relay_chain_block(
&self,
_at_block: &HeaderIdOf<TestChain>,
) -> Result<HeaderIdOf<TestChain>, TestError> {
self.data.lock().await.target_best_finalized_source_block.clone()
}
async fn free_source_relay_headers_interval(
&self,
) -> Result<Option<BlockNumberOf<TestParachain>>, TestError> {
self.data.lock().await.target_free_source_relay_headers_interval.clone()
}
async fn parachain_head(
&self,
_at_block: HeaderIdOf<TestChain>,
) -> Result<Option<(HeaderIdOf<TestChain>, HeaderIdOf<TestParachain>)>, TestError> {
Svyatoslav Nikolsky
committed
self.data.lock().await.target_head.clone()
Svyatoslav Nikolsky
committed
async fn submit_parachain_head_proof(
at_source_block: HeaderIdOf<TestChain>,
Svyatoslav Nikolsky
committed
_updated_parachain_head: ParaHash,
_is_free_execution_expected: bool,
) -> Result<TestTransactionTracker, Self::Error> {
let mut data = self.data.lock().await;
data.target_submit_result.clone()?;
data.submitted_proof_at_source_relay_block = Some(at_source_block);
if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() {
exit_signal_sender.send(()).await.unwrap();
}
Ok(TestTransactionTracker(Some(
TrackedTransactionStatus::Finalized(Default::default()),
)))
}
}
#[test]
fn when_source_client_fails_to_return_sync_state() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_sync_status = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
None,
false,
futures::future::pending(),
)),
);
}
#[test]
fn when_target_client_fails_to_return_best_block() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_best_block = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_target_client_fails_to_read_heads() {
let mut test_target_client = TestClientData::minimal();
Svyatoslav Nikolsky
committed
test_target_client.target_head = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_target_client_fails_to_read_best_finalized_source_block() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_best_finalized_source_block = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_source_client_fails_to_read_heads() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_head.insert(0, Err(TestError::Error));
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Source),
);
}
#[test]
fn when_source_client_fails_to_prove_heads() {
let mut test_source_client = TestClientData::minimal();
Svyatoslav Nikolsky
committed
test_source_client.source_proof = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Source),
);
}
#[test]
fn when_target_client_rejects_update_transaction() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_submit_result = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
false,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn minimal_working_case() {
let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(TestClientData::with_exit_signal_sender(exit_signal_sender)),
None,
false,
exit_signal.into_future().map(|(_, _)| ()),
)),
Ok(()),
);
}
#[async_std::test]
async fn free_headers_are_relayed() {
// prepare following case:
// 1) best source relay at target: 95
// 2) best source parachain at target: 5 at relay 50
// 3) free headers interval: 10
// 4) at source relay chain block 90 source parachain block is 9
// +
// 5) best finalized source relay chain block is 95
// 6) at source relay chain block 95 source parachain block is 42
// =>
// parachain block 42 would have been relayed, because 95 - 50 > 10
let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
let clients_data = TestClientData {
source_sync_status: Ok(true),
source_head: vec![
(90, Ok(AvailableHeader::Available(HeaderId(9, [9u8; 32].into())))),
(95, Ok(AvailableHeader::Available(HeaderId(42, [42u8; 32].into())))),
]
.into_iter()
.collect(),
source_proof: Ok(()),
target_free_source_relay_headers_interval: Ok(Some(10)),
target_best_block: Ok(HeaderId(200, [200u8; 32].into())),
target_best_finalized_source_block: Ok(HeaderId(95, [95u8; 32].into())),
target_head: Ok(Some((HeaderId(50, [50u8; 32].into()), HeaderId(5, [5u8; 32].into())))),