(
source_client: impl SourceClient,
target_client: impl TargetClient
,
sync_params: ParachainSyncParams,
metrics_params: MetricsParams,
exit_signal: impl Future + 'static + Send,
) -> Result<(), relay_utils::Error>
where
P::SourceChain: Chain,
{
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_params)
.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::()))?)?
.expose()
.await?
.run(metrics_prefix::
(), move |source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
target_client,
sync_params.clone(),
metrics,
exit_signal.clone(),
)
})
.await
}
/// Run parachain heads synchronization.
async fn run_until_connection_lost(
source_client: impl SourceClient,
target_client: impl TargetClient
,
sync_params: ParachainSyncParams,
_metrics: Option,
exit_signal: impl Future + Send,
) -> Result<(), FailedClient>
where
P::SourceChain: Chain,
{
let exit_signal = exit_signal.fuse();
let min_block_interval = std::cmp::min(
P::SourceChain::AVERAGE_BLOCK_INTERVAL,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
);
let mut tx_tracker: Option> = None;
futures::pin_mut!(exit_signal);
// Note that the internal loop breaks with `FailedClient` error even if error is non-connection.
// It is Ok for now, but it may need to be fixed in the future to use exponential backoff for
// regular errors.
loop {
// either wait for new block, or exit signal
select! {
_ = async_std::task::sleep(min_block_interval).fuse() => {},
_ = exit_signal => return Ok(()),
}
// if source client is not yet synced, we'll need to sleep. Otherwise we risk submitting too
// much redundant transactions
match source_client.ensure_synced().await {
Ok(true) => (),
Ok(false) => {
log::warn!(
target: "bridge",
"{} client is syncing. Won't do anything until it is synced",
P::SourceChain::NAME,
);
continue
},
Err(e) => {
log::warn!(
target: "bridge",
"{} client has failed to return its sync status: {:?}",
P::SourceChain::NAME,
e,
);
return Err(FailedClient::Target)
},
}
// if we have active transaction, we'll need to wait until it is mined or dropped
let best_target_block = target_client.best_block().await.map_err(|e| {
log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceChain::NAME, e);
FailedClient::Target
})?;
let heads_at_target =
read_heads_at_target(&target_client, &best_target_block, &sync_params.parachains)
.await?;
tx_tracker = tx_tracker.take().and_then(|tx_tracker| tx_tracker.update(&heads_at_target));
if tx_tracker.is_some() {
continue
}
// we have no active transaction and may need to update heads, but do we have something for
// update?
let best_finalized_relay_block = target_client
.best_finalized_source_block(&best_target_block)
.await
.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to read best finalized {} block from {}: {:?}",
P::SourceChain::NAME,
P::TargetChain::NAME,
e,
);
FailedClient::Target
})?;
let heads_at_source = read_heads_at_source(
&source_client,
&best_finalized_relay_block,
&sync_params.parachains,
)
.await?;
let updated_ids = select_parachains_to_update::(
heads_at_source,
heads_at_target,
best_finalized_relay_block,
);
let is_update_required = is_update_required(&sync_params, &updated_ids);
log::info!(
target: "bridge",
"Total {} parachains: {}. Up-to-date at {}: {}. Needs update at {}: {}.",
P::SourceChain::NAME,
sync_params.parachains.len(),
P::TargetChain::NAME,
sync_params.parachains.len() - updated_ids.len(),
P::TargetChain::NAME,
updated_ids.len(),
);
if is_update_required {
let heads_proofs = source_client
.prove_parachain_heads(best_finalized_relay_block, &updated_ids)
.await
.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to prove {} parachain heads: {:?}",
P::SourceChain::NAME,
e,
);
FailedClient::Source
})?;
log::info!(
target: "bridge",
"Submitting {} parachain heads update transaction to {}",
P::SourceChain::NAME,
P::TargetChain::NAME,
);
target_client
.submit_parachain_heads_proof(
best_finalized_relay_block,
updated_ids.clone(),
heads_proofs,
)
.await
.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to submit {} parachain heads proof to {}: {:?}",
P::SourceChain::NAME,
P::TargetChain::NAME,
e,
);
FailedClient::Target
})?;
tx_tracker = Some(TransactionTracker::
::new(
updated_ids,
best_finalized_relay_block.0,
sync_params.stall_timeout,
));
}
}
}
/// Given heads at source and target clients, returns set of heads that are out of sync.
fn select_parachains_to_update(
heads_at_source: BTreeMap>,
heads_at_target: BTreeMap>,
best_finalized_relay_block: HeaderIdOf,
) -> Vec
where
P::SourceChain: Chain,
{
log::trace!(
target: "bridge",
"Selecting {} parachains to update at {} (relay block: {:?}):\n\t\
At {}: {:?}\n\t\
At {}: {:?}",
P::SourceChain::NAME,
P::TargetChain::NAME,
best_finalized_relay_block,
P::SourceChain::NAME,
heads_at_source,
P::TargetChain::NAME,
heads_at_target,
);
heads_at_source
.into_iter()
.zip(heads_at_target.into_iter())
.filter(|((para, head_at_source), (_, head_at_target))| {
let needs_update = match (head_at_source, head_at_target) {
(Some(head_at_source), Some(head_at_target))
if head_at_target.at_relay_block_number < best_finalized_relay_block.0 &&
head_at_target.head_hash != *head_at_source =>
{
// source client knows head that is better than the head known to the target
// client
true
},
(Some(_), Some(_)) => {
// this is normal case when relay has recently updated heads, when parachain is
// not progressing or when our source client is
false
},
(Some(_), None) => {
// parachain is not yet known to the target client. This is true when parachain
// or bridge has been just onboarded/started
true
},
(None, Some(_)) => {
// parachain/parathread has been offboarded removed from the system. It needs to
// be propageted to the target client
true
},
(None, None) => {
// all's good - parachain is unknown to both clients
false
},
};
if needs_update {
log::trace!(
target: "bridge",
"{} parachain {:?} needs update at {}: {:?} vs {:?}",
P::SourceChain::NAME,
para,
P::TargetChain::NAME,
head_at_source,
head_at_target,
);
}
needs_update
})
.map(|((para_id, _), _)| para_id)
.collect()
}
/// Returns true if we need to submit update transactions to the target node.
fn is_update_required(sync_params: &ParachainSyncParams, updated_ids: &[ParaId]) -> bool {
match sync_params.strategy {
ParachainSyncStrategy::All => updated_ids.len() == sync_params.parachains.len(),
ParachainSyncStrategy::Any => !updated_ids.is_empty(),
}
}
/// Reads given parachains heads from the source client.
///
/// Guarantees that the returning map will have an entry for every parachain from `parachains`.
async fn read_heads_at_source(
source_client: &impl SourceClient,
at_relay_block: &HeaderIdOf,
parachains: &[ParaId],
) -> Result>, FailedClient> {
let mut para_head_hashes = BTreeMap::new();
for para in parachains {
let para_head = source_client.parachain_head(*at_relay_block, *para).await;
match para_head {
Ok(para_head) => {
para_head_hashes.insert(*para, para_head);
},
Err(e) => {
log::warn!(
target: "bridge",
"Failed to read head of {} parachain {:?}: {:?}",
P::SourceChain::NAME,
para,
e,
);
return Err(FailedClient::Source)
},
}
}
Ok(para_head_hashes)
}
/// Reads given parachains heads from the source client.
///
/// Guarantees that the returning map will have an entry for every parachain from `parachains`.
async fn read_heads_at_target(
target_client: &impl TargetClient,
at_block: &HeaderIdOf,
parachains: &[ParaId],
) -> Result>, FailedClient> {
let mut para_best_head_hashes = BTreeMap::new();
for para in parachains {
let para_best_head = target_client.parachain_head(*at_block, *para).await;
match para_best_head {
Ok(para_best_head) => {
para_best_head_hashes.insert(*para, para_best_head);
},
Err(e) => {
log::warn!(
target: "bridge",
"Failed to read head of {} parachain {:?} at {}: {:?}",
P::SourceChain::NAME,
para,
P::TargetChain::NAME,
e,
);
return Err(FailedClient::Target)
},
}
}
Ok(para_best_head_hashes)
}
/// Parachain heads transaction tracker.
struct TransactionTracker {
/// Ids of parachains which heads were updated in the tracked transaction.
awaiting_update: BTreeSet,
/// Number of relay chain block that has been used to craft parachain heads proof.
relay_block_number: BlockNumberOf,
/// Transaction submit time.
submitted_at: Instant,
/// Transaction death time.
death_time: Instant,
}
impl TransactionTracker
where
P::SourceChain: Chain,
{
/// Creates new parachain heads transaction tracker.
pub fn new(
awaiting_update: impl IntoIterator- ,
relay_block_number: BlockNumberOf
,
stall_timeout: Duration,
) -> Self {
let now = Instant::now();
TransactionTracker {
awaiting_update: awaiting_update.into_iter().collect(),
relay_block_number,
submitted_at: now,
death_time: now + stall_timeout,
}
}
/// Returns `None` if all parachain heads have been updated or we consider our transaction dead.
pub fn update(
mut self,
heads_at_target: &BTreeMap>,
) -> Option {
// remove all pending heads that were synced
for (para, best_para_head) in heads_at_target {
if best_para_head
.as_ref()
.map(|best_para_head| {
best_para_head.at_relay_block_number >= self.relay_block_number
})
.unwrap_or(false)
{
self.awaiting_update.remove(para);
log::trace!(
target: "bridge",
"Head of parachain {:?} has been updated at {}: {:?}. Outdated parachains remaining: {}",
para,
P::TargetChain::NAME,
best_para_head,
self.awaiting_update.len(),
);
}
}
// if we have synced all required heads, we are done
if self.awaiting_update.is_empty() {
return None
}
// if our transaction is dead now, we may start over again
let now = Instant::now();
if now >= self.death_time {
log::warn!(
target: "bridge",
"Parachain heads update transaction {} has been lost: no updates for {}s",
P::TargetChain::NAME,
(now - self.submitted_at).as_secs(),
);
return None
}
Some(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_std::sync::{Arc, Mutex};
use codec::Encode;
use futures::{SinkExt, StreamExt};
use relay_substrate_client::test_chain::TestChain;
use relay_utils::{HeaderId, MaybeConnectionError};
use sp_core::H256;
const PARA_ID: u32 = 0;
const PARA_0_HASH: ParaHash = H256([1u8; 32]);
const PARA_1_HASH: ParaHash = H256([2u8; 32]);
#[derive(Clone, Debug)]
enum TestError {
Error,
MissingParachainHeadProof,
}
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
false
}
}
#[derive(Clone, Debug, PartialEq)]
struct TestParachainsPipeline;
impl ParachainsPipeline for TestParachainsPipeline {
type SourceChain = TestChain;
type TargetChain = TestChain;
}
#[derive(Clone, Debug)]
struct TestClient {
data: Arc>,
}
#[derive(Clone, Debug)]
struct TestClientData {
source_sync_status: Result,
source_heads: BTreeMap>,
source_proofs: BTreeMap, TestError>>,
target_best_block: Result, TestError>,
target_best_finalized_source_block: Result, TestError>,
target_heads: BTreeMap>,
target_submit_result: Result<(), TestError>,
exit_signal_sender: Option>>,
}
impl TestClientData {
pub fn minimal() -> Self {
TestClientData {
source_sync_status: Ok(true),
source_heads: vec![(PARA_ID, Ok(PARA_0_HASH))].into_iter().collect(),
source_proofs: vec![(PARA_ID, Ok(PARA_0_HASH.encode()))].into_iter().collect(),
target_best_block: Ok(HeaderId(0, Default::default())),
target_best_finalized_source_block: Ok(HeaderId(0, Default::default())),
target_heads: BTreeMap::new(),
target_submit_result: Ok(()),
exit_signal_sender: None,
}
}
pub fn with_exit_signal_sender(
sender: futures::channel::mpsc::UnboundedSender<()>,
) -> Self {
let mut client = Self::minimal();
client.exit_signal_sender = Some(Box::new(sender));
client
}
}
impl From for TestClient {
fn from(data: TestClientData) -> TestClient {
TestClient { data: Arc::new(Mutex::new(data)) }
}
}
#[async_trait]
impl RelayClient for TestClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
unimplemented!()
}
}
#[async_trait]
impl SourceClient for TestClient {
async fn ensure_synced(&self) -> Result {
self.data.lock().await.source_sync_status.clone()
}
async fn parachain_head(
&self,
_at_block: HeaderIdOf,
para_id: ParaId,
) -> Result, TestError> {
self.data.lock().await.source_heads.get(¶_id.0).cloned().transpose()
}
async fn prove_parachain_heads(
&self,
_at_block: HeaderIdOf,
parachains: &[ParaId],
) -> Result {
let mut proofs = Vec::new();
for para_id in parachains {
proofs.push(
self.data
.lock()
.await
.source_proofs
.get(¶_id.0)
.cloned()
.transpose()?
.ok_or(TestError::MissingParachainHeadProof)?,
);
}
Ok(proofs)
}
}
#[async_trait]
impl TargetClient for TestClient {
async fn best_block(&self) -> Result, TestError> {
self.data.lock().await.target_best_block.clone()
}
async fn best_finalized_source_block(
&self,
_at_block: &HeaderIdOf,
) -> Result, TestError> {
self.data.lock().await.target_best_finalized_source_block.clone()
}
async fn parachain_head(
&self,
_at_block: HeaderIdOf,
para_id: ParaId,
) -> Result, TestError> {
self.data.lock().await.target_heads.get(¶_id.0).cloned().transpose()
}
async fn submit_parachain_heads_proof(
&self,
_at_source_block: HeaderIdOf,
_updated_parachains: Vec,
_proof: ParaHeadsProof,
) -> Result<(), Self::Error> {
self.data.lock().await.target_submit_result.clone()?;
if let Some(mut exit_signal_sender) = self.data.lock().await.exit_signal_sender.take() {
exit_signal_sender.send(()).await.unwrap();
}
Ok(())
}
}
fn default_sync_params() -> ParachainSyncParams {
ParachainSyncParams {
parachains: vec![ParaId(PARA_ID)],
strategy: ParachainSyncStrategy::Any,
stall_timeout: Duration::from_secs(60),
}
}
#[test]
fn when_source_client_fails_to_return_sync_state() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_sync_status = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
default_sync_params(),
None,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_target_client_fails_to_return_best_block() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_best_block = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
default_sync_params(),
None,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_target_client_fails_to_read_heads() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_heads.insert(PARA_ID, Err(TestError::Error));
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
default_sync_params(),
None,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_target_client_fails_to_read_best_finalized_source_block() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_best_finalized_source_block = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
default_sync_params(),
None,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn when_source_client_fails_to_read_heads() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_heads.insert(PARA_ID, Err(TestError::Error));
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
default_sync_params(),
None,
futures::future::pending(),
)),
Err(FailedClient::Source),
);
}
#[test]
fn when_source_client_fails_to_prove_heads() {
let mut test_source_client = TestClientData::minimal();
test_source_client.source_proofs.insert(PARA_ID, Err(TestError::Error));
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(test_source_client),
TestClient::from(TestClientData::minimal()),
default_sync_params(),
None,
futures::future::pending(),
)),
Err(FailedClient::Source),
);
}
#[test]
fn when_target_client_rejects_update_transaction() {
let mut test_target_client = TestClientData::minimal();
test_target_client.target_submit_result = Err(TestError::Error);
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(test_target_client),
default_sync_params(),
None,
futures::future::pending(),
)),
Err(FailedClient::Target),
);
}
#[test]
fn minimal_working_case() {
let (exit_signal_sender, exit_signal) = futures::channel::mpsc::unbounded();
assert_eq!(
async_std::task::block_on(run_until_connection_lost(
TestClient::from(TestClientData::minimal()),
TestClient::from(TestClientData::with_exit_signal_sender(exit_signal_sender)),
default_sync_params(),
None,
exit_signal.into_future().map(|(_, _)| ()),
)),
Ok(()),
);
}
const PARA_1_ID: u32 = PARA_ID + 1;
const SOURCE_BLOCK_NUMBER: u32 = 100;
fn test_tx_tracker() -> TransactionTracker {
TransactionTracker::new(
vec![ParaId(PARA_ID), ParaId(PARA_1_ID)],
SOURCE_BLOCK_NUMBER,
Duration::from_secs(1),
)
}
#[test]
fn tx_tracker_update_when_nothing_is_updated() {
assert_eq!(
test_tx_tracker()
.update(&vec![].into_iter().collect())
.map(|t| t.awaiting_update),
Some(test_tx_tracker().awaiting_update),
);
}
#[test]
fn tx_tracker_update_when_one_of_heads_is_updated_to_previous_value() {
assert_eq!(
test_tx_tracker()
.update(
&vec![(
ParaId(PARA_ID),
Some(BestParaHeadHash {
at_relay_block_number: SOURCE_BLOCK_NUMBER - 1,
head_hash: PARA_0_HASH,
})
)]
.into_iter()
.collect()
)
.map(|t| t.awaiting_update),
Some(test_tx_tracker().awaiting_update),
);
}
#[test]
fn tx_tracker_update_when_one_of_heads_is_updated() {
assert_eq!(
test_tx_tracker()
.update(
&vec![(
ParaId(PARA_ID),
Some(BestParaHeadHash {
at_relay_block_number: SOURCE_BLOCK_NUMBER,
head_hash: PARA_0_HASH,
})
)]
.into_iter()
.collect()
)
.map(|t| t.awaiting_update),
Some(vec![ParaId(PARA_1_ID)].into_iter().collect()),
);
}
#[test]
fn tx_tracker_update_when_all_heads_are_updated() {
assert_eq!(
test_tx_tracker()
.update(
&vec![
(
ParaId(PARA_ID),
Some(BestParaHeadHash {
at_relay_block_number: SOURCE_BLOCK_NUMBER,
head_hash: PARA_0_HASH,
})
),
(
ParaId(PARA_1_ID),
Some(BestParaHeadHash {
at_relay_block_number: SOURCE_BLOCK_NUMBER,
head_hash: PARA_0_HASH,
})
),
]
.into_iter()
.collect()
)
.map(|t| t.awaiting_update),
None,
);
}
#[test]
fn tx_tracker_update_when_tx_is_stalled() {
let mut tx_tracker = test_tx_tracker();
tx_tracker.death_time = Instant::now();
assert_eq!(
tx_tracker.update(&vec![].into_iter().collect()).map(|t| t.awaiting_update),
None,
);
}
#[test]
fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() {
assert_eq!(
select_parachains_to_update::(
vec![(ParaId(PARA_ID), None)].into_iter().collect(),
vec![(ParaId(PARA_ID), None)].into_iter().collect(),
HeaderId(10, Default::default()),
),
Vec::::new(),
);
}
#[test]
fn parachain_is_not_updated_if_it_has_been_updated_at_better_relay_block() {
assert_eq!(
select_parachains_to_update::(
vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(),
vec![(
ParaId(PARA_ID),
Some(BestParaHeadHash { at_relay_block_number: 20, head_hash: PARA_1_HASH })
)]
.into_iter()
.collect(),
HeaderId(10, Default::default()),
),
Vec::::new(),
);
}
#[test]
fn parachain_is_not_updated_if_hash_is_the_same_at_next_relay_block() {
assert_eq!(
select_parachains_to_update::(
vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(),
vec![(
ParaId(PARA_ID),
Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH })
)]
.into_iter()
.collect(),
HeaderId(10, Default::default()),
),
Vec::::new(),
);
}
#[test]
fn parachain_is_updated_after_offboarding() {
assert_eq!(
select_parachains_to_update::(
vec![(ParaId(PARA_ID), None)].into_iter().collect(),
vec![(
ParaId(PARA_ID),
Some(BestParaHeadHash {
at_relay_block_number: 0,
head_hash: Default::default(),
})
)]
.into_iter()
.collect(),
HeaderId(10, Default::default()),
),
vec![ParaId(PARA_ID)],
);
}
#[test]
fn parachain_is_updated_after_onboarding() {
assert_eq!(
select_parachains_to_update::(
vec![(ParaId(PARA_ID), Some(PARA_0_HASH))].into_iter().collect(),
vec![(ParaId(PARA_ID), None)].into_iter().collect(),
HeaderId(10, Default::default()),
),
vec![ParaId(PARA_ID)],
);
}
#[test]
fn parachain_is_updated_if_newer_head_is_known() {
assert_eq!(
select_parachains_to_update::(
vec![(ParaId(PARA_ID), Some(PARA_1_HASH))].into_iter().collect(),
vec![(
ParaId(PARA_ID),
Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH })
)]
.into_iter()
.collect(),
HeaderId(10, Default::default()),
),
vec![ParaId(PARA_ID)],
);
}
#[test]
fn is_update_required_works() {
let mut sync_params = ParachainSyncParams {
parachains: vec![ParaId(PARA_ID), ParaId(PARA_1_ID)],
strategy: ParachainSyncStrategy::Any,
stall_timeout: Duration::from_secs(60),
};
assert_eq!(is_update_required(&sync_params, &[]), false);
assert_eq!(is_update_required(&sync_params, &[ParaId(PARA_ID)]), true);
assert_eq!(is_update_required(&sync_params, &[ParaId(PARA_ID), ParaId(PARA_1_ID)]), true);
sync_params.strategy = ParachainSyncStrategy::All;
assert_eq!(is_update_required(&sync_params, &[]), false);
assert_eq!(is_update_required(&sync_params, &[ParaId(PARA_ID)]), false);
assert_eq!(is_update_required(&sync_params, &[ParaId(PARA_ID), ParaId(PARA_1_ID)]), true);
}
}