(
source_client: impl SourceClient,
target_client: impl TargetClient
,
metrics_params: MetricsParams,
exit_signal: impl Future + 'static + Send,
) -> Result<(), relay_utils::Error>
where
P::SourceRelayChain: Chain,
{
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_params)
.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::()))?)?
.expose()
.await?
.run(metrics_prefix::
(), move |source_client, target_client, metrics| {
run_until_connection_lost(source_client, target_client, metrics, exit_signal.clone())
})
.await
}
/// Run parachain heads synchronization.
async fn run_until_connection_lost(
source_client: impl SourceClient,
target_client: impl TargetClient
,
metrics: Option,
exit_signal: impl Future + Send,
) -> Result<(), FailedClient>
where
P::SourceRelayChain: Chain,
{
let exit_signal = exit_signal.fuse();
let min_block_interval = std::cmp::min(
P::SourceRelayChain::AVERAGE_BLOCK_INTERVAL,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
);
let mut submitted_heads_tracker: Option> = None;
futures::pin_mut!(exit_signal);
// Note that the internal loop breaks with `FailedClient` error even if error is non-connection.
// It is Ok for now, but it may need to be fixed in the future to use exponential backoff for
// regular errors.
loop {
// Either wait for new block, or exit signal.
// Please note that we are prioritizing the exit signal since if both events happen at once
// it doesn't make sense to perform one more loop iteration.
select_biased! {
_ = exit_signal => return Ok(()),
_ = async_std::task::sleep(min_block_interval).fuse() => {},
}
// if source client is not yet synced, we'll need to sleep. Otherwise we risk submitting too
// much redundant transactions
match source_client.ensure_synced().await {
Ok(true) => (),
Ok(false) => {
log::warn!(
target: "bridge",
"{} client is syncing. Won't do anything until it is synced",
P::SourceRelayChain::NAME,
);
continue
},
Err(e) => {
log::warn!(
target: "bridge",
"{} client has failed to return its sync status: {:?}",
P::SourceRelayChain::NAME,
e,
);
return Err(FailedClient::Source)
},
}
// if we have active transaction, we'll need to wait until it is mined or dropped
let best_target_block = target_client.best_block().await.map_err(|e| {
log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceRelayChain::NAME, e);
FailedClient::Target
})?;
let head_at_target =
read_head_at_target(&target_client, metrics.as_ref(), &best_target_block).await?;
// check if our transaction has been mined
if let Some(tracker) = submitted_heads_tracker.take() {
match tracker.update(&best_target_block, &head_at_target).await {
SubmittedHeadStatus::Waiting(tracker) => {
// no news about our transaction and we shall keep waiting
submitted_heads_tracker = Some(tracker);
continue
},
SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)) => {
// all heads have been updated, we don't need this tracker anymore
},
SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost) => {
log::warn!(
target: "bridge",
"Parachains synchronization from {} to {} has stalled. Going to restart",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
);
return Err(FailedClient::Both)
},
}
}
// we have no active transaction and may need to update heads, but do we have something for
// update?
let best_finalized_relay_block = target_client
.best_finalized_source_relay_chain_block(&best_target_block)
.await
.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to read best finalized {} block from {}: {:?}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
e,
);
FailedClient::Target
})?;
let head_at_source =
read_head_at_source(&source_client, metrics.as_ref(), &best_finalized_relay_block)
.await?;
let is_update_required = is_update_required::(
head_at_source,
head_at_target,
best_finalized_relay_block,
best_target_block,
);
if is_update_required {
let (head_proof, head_hash) = source_client
.prove_parachain_head(best_finalized_relay_block)
.await
.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to prove {} parachain ParaId({}) heads: {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
e,
);
FailedClient::Source
})?;
log::info!(
target: "bridge",
"Submitting {} parachain ParaId({}) head update transaction to {}. Para hash at source relay {:?}: {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
best_finalized_relay_block,
head_hash,
);
let transaction_tracker = target_client
.submit_parachain_head_proof(best_finalized_relay_block, head_hash, head_proof)
.await
.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to submit {} parachain ParaId({}) heads proof to {}: {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
e,
);
FailedClient::Target
})?;
submitted_heads_tracker =
Some(SubmittedHeadsTracker::
::new(head_at_source, transaction_tracker));
}
}
}
/// Returns `true` if we need to submit parachain-head-update transaction.
fn is_update_required(
head_at_source: AvailableHeader>,
head_at_target: Option>,
best_finalized_relay_block_at_source: HeaderIdOf,
best_target_block: HeaderIdOf,
) -> bool
where
P::SourceRelayChain: Chain,
{
log::trace!(
target: "bridge",
"Checking if {} parachain ParaId({}) needs update at {}:\n\t\
At {} ({:?}): {:?}\n\t\
At {} ({:?}): {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
P::SourceRelayChain::NAME,
best_finalized_relay_block_at_source,
head_at_source,
P::TargetChain::NAME,
best_target_block,
head_at_target,
);
let needs_update = match (head_at_source, head_at_target) {
(AvailableHeader::Unavailable, _) => {
// source client has politely asked us not to update current parachain head
// at the target chain
false
},
(AvailableHeader::Available(head_at_source), Some(head_at_target))
if head_at_source.number() > head_at_target.number() =>
{
// source client knows head that is better than the head known to the target
// client
true
},
(AvailableHeader::Available(_), Some(_)) => {
// this is normal case when relay has recently updated heads, when parachain is
// not progressing, or when our source client is still syncing
false
},
(AvailableHeader::Available(_), None) => {
// parachain is not yet known to the target client. This is true when parachain
// or bridge has been just onboarded/started
true
},
(AvailableHeader::Missing, Some(_)) => {
// parachain/parathread has been offboarded removed from the system. It needs to
// be propageted to the target client
true
},
(AvailableHeader::Missing, None) => {
// all's good - parachain is unknown to both clients
false
},
};
if needs_update {
log::trace!(
target: "bridge",
"{} parachain ParaId({}) needs update at {}: {:?} vs {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
head_at_source,
head_at_target,
);
}
needs_update
}
/// Reads parachain head from the source client.
async fn read_head_at_source(
source_client: &impl SourceClient,
metrics: Option<&ParachainsLoopMetrics>,
at_relay_block: &HeaderIdOf,
) -> Result>, FailedClient> {
let para_head = source_client.parachain_head(*at_relay_block).await;
match para_head {
Ok(AvailableHeader::Available(para_head)) => {
if let Some(metrics) = metrics {
metrics.update_best_parachain_block_at_source(
ParaId(P::SourceParachain::PARACHAIN_ID),
para_head.number(),
);
}
Ok(AvailableHeader::Available(para_head))
},
Ok(r) => Ok(r),
Err(e) => {
log::warn!(
target: "bridge",
"Failed to read head of {} parachain ParaId({:?}): {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
e,
);
Err(FailedClient::Source)
},
}
}
/// Reads parachain head from the target client.
async fn read_head_at_target(
target_client: &impl TargetClient,
metrics: Option<&ParachainsLoopMetrics>,
at_block: &HeaderIdOf,
) -> Result>, FailedClient> {
let para_head_id = target_client.parachain_head(*at_block).await;
match para_head_id {
Ok(Some(para_head_id)) => {
if let Some(metrics) = metrics {
metrics.update_best_parachain_block_at_target(
ParaId(P::SourceParachain::PARACHAIN_ID),
para_head_id.number(),
);
}
Ok(Some(para_head_id))
},
Ok(None) => Ok(None),
Err(e) => {
log::warn!(
target: "bridge",
"Failed to read head of {} parachain ParaId({}) at {}: {:?}",
P::SourceRelayChain::NAME,
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
e,
);
Err(FailedClient::Target)
},
}
}
/// Submitted heads status.
enum SubmittedHeadStatus {
/// Heads are not yet updated.
Waiting(SubmittedHeadsTracker),
/// Heads transaction has either been finalized or lost (i.e. received its "final" status).
Final(TrackedTransactionStatus>),
}
/// Type of the transaction tracker that the `SubmittedHeadsTracker` is using.
///
/// It needs to be shared because of `poll` macro and our consuming `update` method.
type SharedTransactionTracker = Shared<
Pin<
Box<
dyn Future<
Output = TrackedTransactionStatus<
HeaderIdOf<
::TargetChain>,
>,
> + Send,
>,
>,
>;
/// Submitted parachain heads transaction.
struct SubmittedHeadsTracker {
/// Parachain header id that we have submitted.
submitted_head: AvailableHeader>,
/// Future that waits for submitted transaction finality or loss.
///
/// It needs to be shared because of `poll` macro and our consuming `update` method.
transaction_tracker: SharedTransactionTracker,
}
impl SubmittedHeadsTracker {
/// Creates new parachain heads transaction tracker.
pub fn new(
submitted_head: AvailableHeader>,
transaction_tracker: impl TransactionTracker> + 'static,
) -> Self {
SubmittedHeadsTracker {
submitted_head,
transaction_tracker: transaction_tracker.wait().fuse().boxed().shared(),
}
}
/// Returns `None` if all submitted parachain heads have been updated.
pub async fn update(
self,
at_target_block: &HeaderIdOf,
head_at_target: &Option>,
) -> SubmittedHeadStatus {
// check if our head has been updated
let is_head_updated = match (self.submitted_head, head_at_target) {
(AvailableHeader::Available(submitted_head), Some(head_at_target))
if head_at_target.number() >= submitted_head.number() =>
true,
(AvailableHeader::Missing, None) => true,
_ => false,
};
if is_head_updated {
log::trace!(
target: "bridge",
"Head of parachain ParaId({}) has been updated at {}: {:?}",
P::SourceParachain::PARACHAIN_ID,
P::TargetChain::NAME,
head_at_target,
);
return SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(*at_target_block))
}
// if underlying transaction tracker has reported that the transaction is lost, we may
// then restart our sync
let transaction_tracker = self.transaction_tracker.clone();
match poll!(transaction_tracker) {
Poll::Ready(TrackedTransactionStatus::Lost) =>
return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
Poll::Ready(TrackedTransactionStatus::Finalized(_)) => {
// so we are here and our transaction is mined+finalized, but some of heads were not
// updated => we're considering our loop as stalled
return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost)
},
_ => (),
}
SubmittedHeadStatus::Waiting(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_std::sync::{Arc, Mutex};
use codec::Encode;
use futures::{SinkExt, StreamExt};
use relay_substrate_client::test_chain::{TestChain, TestParachain};
use relay_utils::{HeaderId, MaybeConnectionError};
use sp_core::H256;
const PARA_10_HASH: ParaHash = H256([10u8; 32]);
const PARA_20_HASH: ParaHash = H256([20u8; 32]);
#[derive(Clone, Debug)]
enum TestError {
Error,
}
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
false
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct TestParachainsPipeline;
impl ParachainsPipeline for TestParachainsPipeline {
type SourceRelayChain = TestChain;
type SourceParachain = TestParachain;
type TargetChain = TestChain;
}
#[derive(Clone, Debug)]
struct TestClient {
data: Arc>,
}
#[derive(Clone, Debug)]
struct TestTransactionTracker(Option>>);
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
type HeaderId = HeaderIdOf;
async fn wait(self) -> TrackedTransactionStatus> {
match self.0 {
Some(status) => status,
None => futures::future::pending().await,
}
}
}
#[derive(Clone, Debug)]
struct TestClientData {
source_sync_status: Result,
source_head: Result>, TestError>,
source_proof: Result<(), TestError>,
target_best_block: Result, TestError>,
target_best_finalized_source_block: Result, TestError>,
target_head: Result>, TestError>,
target_submit_result: Result<(), TestError>,
exit_signal_sender: Option>>,
}
impl TestClientData {
pub fn minimal() -> Self {
TestClientData {
source_sync_status: Ok(true),
source_head: Ok(AvailableHeader::Available(HeaderId(0, PARA_20_HASH))),
source_proof: Ok(()),
target_best_block: Ok(HeaderId(0, Default::default())),
target_best_finalized_source_block: Ok(HeaderId(0, Default::default())),
target_head: Ok(None),
target_submit_result: Ok(()),
exit_signal_sender: None,
}
}
pub fn with_exit_signal_sender(
sender: futures::channel::mpsc::UnboundedSender<()>,
) -> Self {
let mut client = Self::minimal();
client.exit_signal_sender = Some(Box::new(sender));
client
}
}
impl From for TestClient {
fn from(data: TestClientData) -> TestClient {
TestClient { data: Arc::new(Mutex::new(data)) }
}
}
#[async_trait]
impl RelayClient for TestClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unimplemented!()
}
}
#[async_trait]
impl SourceClient for TestClient {
async fn ensure_synced(&self) -> Result {
self.data.lock().await.source_sync_status.clone()
}
async fn parachain_head(
&self,
_at_block: HeaderIdOf,
) -> Result>, TestError> {
self.data.lock().await.source_head.clone()
}
async fn prove_parachain_head(
&self,
_at_block: HeaderIdOf,
) -> Result<(ParaHeadsProof, ParaHash), TestError> {
let head = *self.data.lock().await.source_head.clone()?.as_available().unwrap();
let storage_proof = vec![head.hash().encode()];
let proof = (ParaHeadsProof { storage_proof }, head.hash());
self.data.lock().await.source_proof.clone().map(|_| proof)
}
}
#[async_trait]
impl TargetClient for TestClient {
type TransactionTracker = TestTransactionTracker;
async fn best_block(&self) -> Result, TestError> {
self.data.lock().await.target_best_block.clone()
}
async fn best_finalized_source_relay_chain_block(
&self,
_at_block: &HeaderIdOf,
) -> Result, TestError> {
self.data.lock().await.target_best_finalized_source_block.clone()
}
async fn parachain_head(
&self,
_at_block: HeaderIdOf,
) -> Result>, TestError> {
self.data.lock().await.target_head.clone()
}
async fn submit_parachain_head_proof(
&self,
_at_source_block: HeaderIdOf,
_updated_parachain_head: ParaHash,
_proof: ParaHeadsProof,
) -> Result {
let mut data = self.data.lock().await;
data.target_submit_result.clone()?;
if let Some(mut exit_signal_sender) = data.exit_signal_sender.take() {
exit_signal_sender.send(()).await.unwrap();
}
Ok(TestTransactionTracker(Some(
TrackedTransactionStatus::Finalized(Default::default()),
)))
}
}
#[test]
fn when_source_client_fails_to_return_sync_state() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_sync_status = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
None,
futures::future::pending(),
)),
Err(FailedClient::Source),
);
}
#[test]
fn when_target_client_fails_to_return_best_block() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_best_block = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_target_client_fails_to_read_heads() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_head = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_target_client_fails_to_read_best_finalized_source_block() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_best_finalized_source_block = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_source_client_fails_to_read_heads() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_head = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
None,
futures::future::pending(),
)),
Err(FailedClient::Source),
);
}
#[test]
fn when_source_client_fails_to_prove_heads() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_proof = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
None,
futures::future::pending(),
)),
Err(FailedClient::Source),
);
}
#[test]
fn when_target_client_rejects_update_transaction() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_submit_result = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
None,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn minimal_working_case() {
let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(TestClientData::with_exit_signal_sender(exit_signal_sender)),
None,
exit_signal.into_future().map(|(_, _)| ()),
)),
Ok(()),
);
}
fn test_tx_tracker() -> SubmittedHeadsTracker {
SubmittedHeadsTracker::new(
AvailableHeader::Available(HeaderId(20, PARA_20_HASH)),
TestTransactionTracker(None),
)
}
impl From> for Option<()> {
fn from(status: SubmittedHeadStatus) -> Option<()> {
match status {
SubmittedHeadStatus::Waiting(_) => Some(()),
_ => None,
}
}
}
#[async_std::test]
async fn tx_tracker_update_when_head_at_target_has_none_value() {
assert_eq!(
Some(()),
test_tx_tracker()
.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
.await
.into(),
);
}
#[async_std::test]
async fn tx_tracker_update_when_head_at_target_has_old_value() {
assert_eq!(
Some(()),
test_tx_tracker()
.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
.await
.into(),
);
}
#[async_std::test]
async fn tx_tracker_update_when_head_at_target_has_same_value() {
assert!(matches!(
test_tx_tracker()
.update(&HeaderId(0, Default::default()), &Some(HeaderId(20, PARA_20_HASH)))
.await,
SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)),
));
}
#[async_std::test]
async fn tx_tracker_update_when_head_at_target_has_better_value() {
assert!(matches!(
test_tx_tracker()
.update(&HeaderId(0, Default::default()), &Some(HeaderId(30, PARA_20_HASH)))
.await,
SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)),
));
}
#[async_std::test]
async fn tx_tracker_update_when_tx_is_lost() {
let mut tx_tracker = test_tx_tracker();
tx_tracker.transaction_tracker =
futures::future::ready(TrackedTransactionStatus::Lost).boxed().shared();
assert!(matches!(
tx_tracker
.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
.await,
SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
));
}
#[async_std::test]
async fn tx_tracker_update_when_tx_is_finalized_but_heads_are_not_updated() {
let mut tx_tracker = test_tx_tracker();
tx_tracker.transaction_tracker =
futures::future::ready(TrackedTransactionStatus::Finalized(Default::default()))
.boxed()
.shared();
assert!(matches!(
tx_tracker
.update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH)))
.await,
SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost),
));
}
#[test]
fn parachain_is_not_updated_if_it_is_unavailable() {
assert!(!is_update_required::(
AvailableHeader::Unavailable,
None,
Default::default(),
Default::default(),
));
assert!(!is_update_required::(
AvailableHeader::Unavailable,
Some(HeaderId(10, PARA_10_HASH)),
Default::default(),
Default::default(),
));
}
#[test]
fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() {
assert!(!is_update_required::(
AvailableHeader::Missing,
None,
Default::default(),
Default::default(),
),);
}
#[test]
fn parachain_is_not_updated_if_target_has_better_head() {
assert!(!is_update_required::(
AvailableHeader::Available(HeaderId(10, Default::default())),
Some(HeaderId(20, Default::default())),
Default::default(),
Default::default(),
),);
}
#[test]
fn parachain_is_updated_after_offboarding() {
assert!(is_update_required::(
AvailableHeader::Missing,
Some(HeaderId(20, Default::default())),
Default::default(),
Default::default(),
),);
}
#[test]
fn parachain_is_updated_after_onboarding() {
assert!(is_update_required::(
AvailableHeader::Available(HeaderId(30, Default::default())),
None,
Default::default(),
Default::default(),
),);
}
#[test]
fn parachain_is_updated_if_newer_head_is_known() {
assert!(is_update_required::(
AvailableHeader::Available(HeaderId(40, Default::default())),
Some(HeaderId(30, Default::default())),
Default::default(),
Default::default(),
),);
}
}