(
source_client: impl SourceClient,
target_client: impl TargetClient
,
sync_params: FinalitySyncParams,
metrics_params: MetricsParams,
exit_signal: impl Future + 'static + Send,
) -> Result<(), relay_utils::Error> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_params)
.loop_metric(SyncLoopMetrics::new(
Some(&metrics_prefix::()),
"source",
"source_at_target",
)?)?
.expose()
.await?
.run(metrics_prefix::
(), move |source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
target_client,
sync_params.clone(),
metrics,
exit_signal.clone(),
)
})
.await
}
/// Unjustified headers container. Ordered by header number.
pub(crate) type UnjustifiedHeaders = Vec;
/// Finality proofs container. Ordered by target header number.
pub(crate) type FinalityProofs =
Vec<(
::Number,
::FinalityProof)>;
/// Reference to finality proofs container.
pub(crate) type FinalityProofsRef<'a, P> =
&'a [(
::Number,
::FinalityProof)];
/// Error that may happen inside finality synchronization loop.
#[derive(Debug)]
pub(crate) enum Error {
/// Source client request has failed with given error.
Source(SourceError),
/// Target client request has failed with given error.
Target(TargetError),
/// Finality proof for mandatory header is missing from the source node.
MissingMandatoryFinalityProof(P::Number),
}
impl Error
where
P: FinalitySyncPipeline,
SourceError: MaybeConnectionError,
TargetError: MaybeConnectionError,
{
fn fail_if_connection_error(&self) -> Result<(), FailedClient> {
match *self {
Error::Source(ref error) if error.is_connection_error() => Err(FailedClient::Source),
Error::Target(ref error) if error.is_connection_error() => Err(FailedClient::Target),
_ => Ok(()),
}
}
}
/// Information about transaction that we have submitted.
#[derive(Debug, Clone)]
pub(crate) struct Transaction {
/// Submitted transaction tracker.
pub tracker: Tracker,
/// The number of the header we have submitted.
pub submitted_header_number: Number,
}
impl Transaction {
pub async fn submit<
C: TargetClient,
P: FinalitySyncPipeline,
>(
target_client: &C,
header: P::Header,
justification: P::FinalityProof,
) -> Result {
let submitted_header_number = header.number();
log::debug!(
target: "bridge",
"Going to submit finality proof of {} header #{:?} to {}",
P::SOURCE_NAME,
submitted_header_number,
P::TARGET_NAME,
);
let tracker = target_client.submit_finality_proof(header, justification).await?;
Ok(Transaction { tracker, submitted_header_number })
}
pub async fn track, P: FinalitySyncPipeline>(
self,
target_client: &C,
) -> Result<(), String> {
match self.tracker.wait().await {
TrackedTransactionStatus::Finalized(_) => {
// The transaction has been finalized, but it may have been finalized in the
// "failed" state. So let's check if the block number was actually updated.
// If it wasn't then we are stalled.
//
// Please also note that we're returning an error if we fail to read required data
// from the target client - that's the best we can do here to avoid actual stall.
target_client
.best_finalized_source_block_id()
.await
.map_err(|e| format!("failed to read best block from target node: {:?}", e))
.and_then(|best_id_at_target| {
if self.submitted_header_number > best_id_at_target.0 {
return Err(format!(
"best block at target after tx is {:?} and we've submitted {:?}",
best_id_at_target.0, self.submitted_header_number,
))
}
Ok(())
})
},
TrackedTransactionStatus::Lost => Err("transaction failed".to_string()),
}
}
}
/// Finality proofs stream that may be restarted.
pub(crate) struct RestartableFinalityProofsStream {
/// Flag that the stream needs to be restarted.
pub(crate) needs_restart: bool,
/// The stream itself.
stream: Pin>,
}
impl RestartableFinalityProofsStream {
pub async fn create_raw_stream<
C: SourceClient ,
P: FinalitySyncPipeline,
>(
source_client: &C,
) -> Result {
source_client.finality_proofs().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}. Going to reconnect",
P::SOURCE_NAME,
error,
);
FailedClient::Source
})
}
pub async fn restart_if_scheduled<
C: SourceClient
,
P: FinalitySyncPipeline,
>(
&mut self,
source_client: &C,
) -> Result<(), FailedClient> {
if self.needs_restart {
log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME);
self.needs_restart = false;
self.stream = Box::pin(Self::create_raw_stream(source_client).await?);
}
Ok(())
}
pub fn next(&mut self) -> Option {
match self.stream.next().now_or_never() {
Some(Some(finality_proof)) => Some(finality_proof),
Some(None) => {
self.needs_restart = true;
None
},
None => None,
}
}
}
impl From for RestartableFinalityProofsStream {
fn from(stream: S) -> Self {
RestartableFinalityProofsStream { needs_restart: false, stream: Box::pin(stream) }
}
}
/// Finality synchronization loop state.
pub(crate) struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> {
/// Synchronization loop progress.
pub(crate) progress: &'a mut (Instant, Option),
/// Finality proofs stream.
pub(crate) finality_proofs_stream:
&'a mut RestartableFinalityProofsStream,
/// Recent finality proofs that we have read from the stream.
pub(crate) recent_finality_proofs: &'a mut FinalityProofs,
/// Number of the last header, submitted to the target node.
pub(crate) submitted_header_number: Option,
}
/// Run finality relay loop until connection to one of nodes is lost.
pub(crate) async fn run_until_connection_lost(
source_client: impl SourceClient,
target_client: impl TargetClient
,
sync_params: FinalitySyncParams,
metrics_sync: Option,
exit_signal: impl Future,
) -> Result<(), FailedClient> {
let last_transaction_tracker = futures::future::Fuse::terminated();
let exit_signal = exit_signal.fuse();
futures::pin_mut!(last_transaction_tracker, exit_signal);
let mut finality_proofs_stream =
RestartableFinalityProofsStream::create_raw_stream(&source_client).await?.into();
let mut recent_finality_proofs = Vec::new();
let mut progress = (Instant::now(), None);
let mut retry_backoff = retry_backoff();
let mut last_submitted_header_number = None;
loop {
// run loop iteration
let iteration_result = run_loop_iteration(
&source_client,
&target_client,
FinalityLoopState {
progress: &mut progress,
finality_proofs_stream: &mut finality_proofs_stream,
recent_finality_proofs: &mut recent_finality_proofs,
submitted_header_number: last_submitted_header_number,
},
&sync_params,
&metrics_sync,
)
.await;
// deal with errors
let next_tick = match iteration_result {
Ok(Some(updated_transaction)) => {
last_submitted_header_number = Some(updated_transaction.submitted_header_number);
last_transaction_tracker.set(updated_transaction.track(&target_client).fuse());
retry_backoff.reset();
sync_params.tick
},
Ok(None) => {
retry_backoff.reset();
sync_params.tick
},
Err(error) => {
log::error!(target: "bridge", "Finality sync loop iteration has failed with error: {:?}", error);
error.fail_if_connection_error()?;
retry_backoff.next_backoff().unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY)
},
};
finality_proofs_stream.restart_if_scheduled(&source_client).await?;
// wait till exit signal, or new source block
select! {
transaction_result = last_transaction_tracker => {
transaction_result.map_err(|e| {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled with error: {}. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
e,
);
// Restart the loop if we're stalled.
FailedClient::Both
})?
},
_ = async_std::task::sleep(next_tick).fuse() => {},
_ = exit_signal => return Ok(()),
}
}
}
pub(crate) async fn run_loop_iteration(
source_client: &SC,
target_client: &TC,
state: FinalityLoopState<'_, P, SC::FinalityProofsStream>,
sync_params: &FinalitySyncParams,
metrics_sync: &Option,
) -> Result>, Error>
where
P: FinalitySyncPipeline,
SC: SourceClient
,
TC: TargetClient
,
{
// read best source headers ids from source and target nodes
let best_number_at_source =
source_client.best_finalized_block_number().await.map_err(Error::Source)?;
let best_id_at_target =
target_client.best_finalized_source_block_id().await.map_err(Error::Target)?;
let best_number_at_target = best_id_at_target.0;
let different_hash_at_source = ensure_same_fork::
(&best_id_at_target, source_client)
.await
.map_err(Error::Source)?;
let using_same_fork = different_hash_at_source.is_none();
if let Some(ref different_hash_at_source) = different_hash_at_source {
log::error!(
target: "bridge",
"Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \
at-source {:?} vs at-target {:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
best_number_at_target,
different_hash_at_source,
best_id_at_target.1,
);
}
if let Some(ref metrics_sync) = *metrics_sync {
metrics_sync.update_best_block_at_source(best_number_at_source);
metrics_sync.update_best_block_at_target(best_number_at_target);
metrics_sync.update_using_same_fork(using_same_fork);
}
*state.progress =
print_sync_progress::
(*state.progress, best_number_at_source, best_number_at_target);
// if we have already submitted header, then we just need to wait for it
// if we're waiting too much, then we believe our transaction has been lost and restart sync
if let Some(submitted_header_number) = state.submitted_header_number {
if best_number_at_target >= submitted_header_number {
// transaction has been mined && we can continue
} else {
return Ok(None)
}
}
// submit new header if we have something new
match select_header_to_submit(
source_client,
target_client,
state.finality_proofs_stream,
state.recent_finality_proofs,
best_number_at_source,
best_number_at_target,
sync_params,
)
.await?
{
Some((header, justification)) => {
let transaction = Transaction::submit(target_client, header, justification)
.await
.map_err(Error::Target)?;
Ok(Some(transaction))
},
None => Ok(None),
}
}
pub(crate) async fn select_header_to_submit
(
source_client: &SC,
target_client: &TC,
finality_proofs_stream: &mut RestartableFinalityProofsStream,
recent_finality_proofs: &mut FinalityProofs,
best_number_at_source: P::Number,
best_number_at_target: P::Number,
sync_params: &FinalitySyncParams,
) -> Result, Error>
where
P: FinalitySyncPipeline,
SC: SourceClient
,
TC: TargetClient
,
{
// to see that the loop is progressing
log::trace!(
target: "bridge",
"Considering range of headers ({:?}; {:?}]",
best_number_at_target,
best_number_at_source,
);
// read missing headers. if we see that the header schedules GRANDPA change, we need to
// submit this header
let selected_finality_proof = read_missing_headers::
(
source_client,
target_client,
best_number_at_source,
best_number_at_target,
)
.await?;
let (mut unjustified_headers, mut selected_finality_proof) = match selected_finality_proof {
SelectedFinalityProof::Mandatory(header, finality_proof) =>
return Ok(Some((header, finality_proof))),
_ if sync_params.only_mandatory_headers => {
// we are not reading finality proofs from the stream, so eventually it'll break
// but we don't care about transient proofs at all, so it is acceptable
return Ok(None)
},
SelectedFinalityProof::Regular(unjustified_headers, header, finality_proof) =>
(unjustified_headers, Some((header, finality_proof))),
SelectedFinalityProof::None(unjustified_headers) => (unjustified_headers, None),
};
// all headers that are missing from the target client are non-mandatory
// => even if we have already selected some header and its persistent finality proof,
// we may try to select better header by reading non-persistent proofs from the stream
read_finality_proofs_from_stream::
(finality_proofs_stream, recent_finality_proofs);
selected_finality_proof = select_better_recent_finality_proof::
(
recent_finality_proofs,
&mut unjustified_headers,
selected_finality_proof,
);
// remove obsolete 'recent' finality proofs + keep its size under certain limit
let oldest_finality_proof_to_keep = selected_finality_proof
.as_ref()
.map(|(header, _)| header.number())
.unwrap_or(best_number_at_target);
prune_recent_finality_proofs::
(
oldest_finality_proof_to_keep,
recent_finality_proofs,
sync_params.recent_finality_proofs_limit,
);
Ok(selected_finality_proof)
}
/// Ensures that both clients are on the same fork.
///
/// Returns `Some(_)` with header has at the source client if headers are different.
async fn ensure_same_fork>(
best_id_at_target: &HeaderId,
source_client: &SC,
) -> Result, SC::Error> {
let header_at_source = source_client.header_and_finality_proof(best_id_at_target.0).await?.0;
let header_hash_at_source = header_at_source.hash();
Ok(if best_id_at_target.1 == header_hash_at_source {
None
} else {
Some(header_hash_at_source)
})
}
/// Finality proof that has been selected by the `read_missing_headers` function.
pub(crate) enum SelectedFinalityProof {
/// Mandatory header and its proof has been selected. We shall submit proof for this header.
Mandatory(Header, FinalityProof),
/// Regular header and its proof has been selected. We may submit this proof, or proof for
/// some better header.
Regular(UnjustifiedHeaders, Header, FinalityProof),
/// We haven't found any missing header with persistent proof at the target client.
None(UnjustifiedHeaders),
}
/// Read missing headers and their persistent finality proofs from the target client.
///
/// If we have found some header with known proof, it is returned.
/// Otherwise, `SelectedFinalityProof::None` is returned.
///
/// Unless we have found mandatory header, all missing headers are collected and returned.
pub(crate) async fn read_missing_headers<
P: FinalitySyncPipeline,
SC: SourceClient,
TC: TargetClient
,
>(
source_client: &SC,
_target_client: &TC,
best_number_at_source: P::Number,
best_number_at_target: P::Number,
) -> Result, Error> {
let mut unjustified_headers = Vec::new();
let mut selected_finality_proof = None;
let mut header_number = best_number_at_target + One::one();
while header_number <= best_number_at_source {
let (header, finality_proof) = source_client
.header_and_finality_proof(header_number)
.await
.map_err(Error::Source)?;
let is_mandatory = header.is_mandatory();
match (is_mandatory, finality_proof) {
(true, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} is mandatory", header_number);
return Ok(SelectedFinalityProof::Mandatory(header, finality_proof))
},
(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
(false, Some(finality_proof)) => {
log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
unjustified_headers.clear();
selected_finality_proof = Some((header, finality_proof));
},
(false, None) => {
unjustified_headers.push(header);
},
}
header_number = header_number + One::one();
}
log::trace!(
target: "bridge",
"Read {} {} headers. Selected finality proof for header: {:?}",
best_number_at_source.saturating_sub(best_number_at_target),
P::SOURCE_NAME,
selected_finality_proof.as_ref().map(|(header, _)| header),
);
Ok(match selected_finality_proof {
Some((header, proof)) => SelectedFinalityProof::Regular(unjustified_headers, header, proof),
None => SelectedFinalityProof::None(unjustified_headers),
})
}
/// Read finality proofs from the stream.
pub(crate) fn read_finality_proofs_from_stream<
P: FinalitySyncPipeline,
FPS: Stream- ,
>(
finality_proofs_stream: &mut RestartableFinalityProofsStream
,
recent_finality_proofs: &mut FinalityProofs,
) {
let mut proofs_count = 0;
let mut first_header_number = None;
let mut last_header_number = None;
while let Some(finality_proof) = finality_proofs_stream.next() {
let target_header_number = finality_proof.target_header_number();
if first_header_number.is_none() {
first_header_number = Some(target_header_number);
}
last_header_number = Some(target_header_number);
proofs_count += 1;
recent_finality_proofs.push((target_header_number, finality_proof));
}
if proofs_count != 0 {
log::trace!(
target: "bridge",
"Read {} finality proofs from {} finality stream for headers in range [{:?}; {:?}]",
proofs_count,
P::SOURCE_NAME,
first_header_number,
last_header_number,
);
}
}
/// Try to select better header and its proof, given finality proofs that we
/// have recently read from the stream.
pub(crate) fn select_better_recent_finality_proof(
recent_finality_proofs: FinalityProofsRef,
unjustified_headers: &mut UnjustifiedHeaders,
selected_finality_proof: Option<(P::Header, P::FinalityProof)>,
) -> Option<(P::Header, P::FinalityProof)> {
if unjustified_headers.is_empty() || recent_finality_proofs.is_empty() {
log::trace!(
target: "bridge",
"Can not improve selected {} finality proof {:?}. No unjustified headers and recent proofs",
P::SOURCE_NAME,
selected_finality_proof.as_ref().map(|(h, _)| h.number()),
);
return selected_finality_proof
}
const NOT_EMPTY_PROOF: &str = "we have checked that the vec is not empty; qed";
// we need proofs for headers in range unjustified_range_begin..=unjustified_range_end
let unjustified_range_begin = unjustified_headers.first().expect(NOT_EMPTY_PROOF).number();
let unjustified_range_end = unjustified_headers.last().expect(NOT_EMPTY_PROOF).number();
// we have proofs for headers in range buffered_range_begin..=buffered_range_end
let buffered_range_begin = recent_finality_proofs.first().expect(NOT_EMPTY_PROOF).0;
let buffered_range_end = recent_finality_proofs.last().expect(NOT_EMPTY_PROOF).0;
// we have two ranges => find intersection
let intersection_begin = std::cmp::max(unjustified_range_begin, buffered_range_begin);
let intersection_end = std::cmp::min(unjustified_range_end, buffered_range_end);
let intersection = intersection_begin..=intersection_end;
// find last proof from intersection
let selected_finality_proof_index = recent_finality_proofs
.binary_search_by_key(intersection.end(), |(number, _)| *number)
.unwrap_or_else(|index| index.saturating_sub(1));
let (selected_header_number, finality_proof) =
&recent_finality_proofs[selected_finality_proof_index];
let has_selected_finality_proof = intersection.contains(selected_header_number);
log::trace!(
target: "bridge",
"Trying to improve selected {} finality proof {:?}. Headers range: [{:?}; {:?}]. Proofs range: [{:?}; {:?}].\
Trying to improve to: {:?}. Result: {}",
P::SOURCE_NAME,
selected_finality_proof.as_ref().map(|(h, _)| h.number()),
unjustified_range_begin,
unjustified_range_end,
buffered_range_begin,
buffered_range_end,
selected_header_number,
if has_selected_finality_proof { "improved" } else { "not improved" },
);
if !has_selected_finality_proof {
return selected_finality_proof
}
// now remove all obsolete headers and extract selected header
let selected_header_position = unjustified_headers
.binary_search_by_key(selected_header_number, |header| header.number())
.expect("unjustified_headers contain all headers from intersection; qed");
let selected_header = unjustified_headers.swap_remove(selected_header_position);
Some((selected_header, finality_proof.clone()))
}
pub(crate) fn prune_recent_finality_proofs(
justified_header_number: P::Number,
recent_finality_proofs: &mut FinalityProofs,
recent_finality_proofs_limit: usize,
) {
let justified_header_idx = recent_finality_proofs
.binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number)
.map(|idx| idx + 1)
.unwrap_or_else(|idx| idx);
let proofs_limit_idx =
recent_finality_proofs.len().saturating_sub(recent_finality_proofs_limit);
*recent_finality_proofs =
recent_finality_proofs.split_off(std::cmp::max(justified_header_idx, proofs_limit_idx));
}
fn print_sync_progress(
progress_context: (Instant, Option),
best_number_at_source: P::Number,
best_number_at_target: P::Number,
) -> (Instant, Option) {
let (prev_time, prev_best_number_at_target) = progress_context;
let now = Instant::now();
let need_update = now - prev_time > Duration::from_secs(10) ||
prev_best_number_at_target
.map(|prev_best_number_at_target| {
best_number_at_target.saturating_sub(prev_best_number_at_target) > 10.into()
})
.unwrap_or(true);
if !need_update {
return (prev_time, prev_best_number_at_target)
}
log::info!(
target: "bridge",
"Synced {:?} of {:?} headers",
best_number_at_target,
best_number_at_source,
);
(now, Some(best_number_at_target))
}