: FinalitySourceClient,
P: FinalitySyncPipeline,
{
finality_source
.best_finalized_block_number()
.await
.map(Some)
.unwrap_or_else(|error| {
log::error!(
target: "bridge",
"Failed to read best finalized source header from source in {} relay: {:?}",
relay_task_name,
error,
);
None
})
}
/// Read best finalized source block number from target client.
///
/// Returns `None` if we have failed to read the number.
async fn best_finalized_source_header_at_target(
finality_target: &SubstrateFinalityTarget,
relay_task_name: &str,
) -> Option
where
SubstrateFinalityTarget: FinalityTargetClient,
P: FinalitySyncPipeline,
{
finality_target
.best_finalized_source_block_number()
.await
.map(Some)
.unwrap_or_else(|error| {
log::error!(
target: "bridge",
"Failed to read best finalized source header from target in {} relay: {:?}",
relay_task_name,
error,
);
None
})
}
/// What to do with the on-demand relay task?
#[derive(Debug, PartialEq)]
enum OnDemandRelayAction {
Start,
Stop,
None,
}
fn select_on_demand_relay_action(
best_finalized_source_header_at_source: Option,
best_finalized_source_header_at_target: Option,
mut required_source_header_at_target: C::BlockNumber,
maximal_headers_difference: C::BlockNumber,
relay_task_name: &str,
is_active: bool,
) -> OnDemandRelayAction {
// if we have been unable to read header number from the target, then let's assume
// that it is the same as required header number. Otherwise we risk submitting
// unneeded transactions
let best_finalized_source_header_at_target =
best_finalized_source_header_at_target.unwrap_or(required_source_header_at_target);
// if we have been unable to read header number from the source, then let's assume
// that it is the same as at the target
let best_finalized_source_header_at_source =
best_finalized_source_header_at_source.unwrap_or(best_finalized_source_header_at_target);
// if there are too many source headers missing from the target node, require some
// new headers at target
//
// why do we need that? When complex headers+messages relay is used, it'll normally only relay
// headers when there are undelivered messages/confirmations. But security model of the
// `pallet-bridge-grandpa` module relies on the fact that headers are synced in real-time and
// that it'll see authorities-change header before unbonding period will end for previous
// authorities set.
let current_headers_difference = best_finalized_source_header_at_source
.checked_sub(&best_finalized_source_header_at_target)
.unwrap_or_else(Zero::zero);
if current_headers_difference > maximal_headers_difference {
required_source_header_at_target = best_finalized_source_header_at_source;
// don't log if relay is already running
if !is_active {
log::trace!(
target: "bridge",
"Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the {}",
C::NAME,
relay_task_name,
best_finalized_source_header_at_source,
best_finalized_source_header_at_target,
best_finalized_source_header_at_source,
);
}
}
// now let's select what to do with relay
let needs_to_be_active = required_source_header_at_target > best_finalized_source_header_at_target;
match (needs_to_be_active, is_active) {
(true, false) => OnDemandRelayAction::Start,
(false, true) => OnDemandRelayAction::Stop,
_ => OnDemandRelayAction::None,
}
}
/// On-demand headers relay task name.
fn on_demand_headers_relay_name() -> String {
format!("on-demand-{}-to-{}", SourceChain::NAME, TargetChain::NAME)
}
/// Start on-demand headers relay task.
fn start_on_demand_headers_relay(
task_name: String,
relay_exited_tx: oneshot::Sender<()>,
source_client: Client,
target_client: Client,
pipeline: SubstrateFinalityToSubstrate,
) -> Option>
where
SourceChain::BlockNumber: BlockNumberBase,
SubstrateFinalityToSubstrate: SubstrateFinalitySyncPipeline<
Hash = HashOf,
Number = BlockNumberOf,
Header = SyncHeader,
FinalityProof = GrandpaJustification,
TargetChain = TargetChain,
>,
TargetSign: 'static,
{
let headers_relay_future =
crate::finality_pipeline::run(pipeline, source_client, target_client, true, MetricsParams::disabled());
let closure_task_name = task_name.clone();
async_std::task::Builder::new()
.name(task_name.clone())
.spawn(async move {
log::info!(target: "bridge", "Starting {} headers relay", closure_task_name);
let result = headers_relay_future.await;
log::trace!(target: "bridge", "{} headers relay has exited. Result: {:?}", closure_task_name, result);
let _ = relay_exited_tx.send(());
})
.map_err(|error| {
log::error!(
target: "bridge",
"Failed to start {} relay: {:?}",
task_name,
error,
);
})
.ok()
}
/// Stop on-demand headers relay task.
async fn stop_on_demand_headers_relay(task: Option>) {
if let Some(task) = task {
let task_name = task
.task()
.name()
.expect("on-demand tasks are always started with name; qed")
.to_string();
log::trace!(target: "bridge", "Cancelling {} headers relay", task_name);
task.cancel().await;
log::info!(target: "bridge", "Cancelled {} headers relay", task_name);
}
}
#[cfg(test)]
mod tests {
use super::*;
type TestChain = relay_millau_client::Millau;
const AT_SOURCE: Option = Some(10);
const AT_TARGET: Option = Some(1);
#[test]
fn starts_relay_when_headers_are_required() {
assert_eq!(
select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 5, 100, "test", false),
OnDemandRelayAction::Start,
);
assert_eq!(
select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 5, 100, "test", true),
OnDemandRelayAction::None,
);
}
#[test]
fn starts_relay_when_too_many_headers_missing() {
assert_eq!(
select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 0, 5, "test", false),
OnDemandRelayAction::Start,
);
assert_eq!(
select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, 0, 5, "test", true),
OnDemandRelayAction::None,
);
}
#[test]
fn stops_relay_if_required_header_is_synced() {
assert_eq!(
select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", true),
OnDemandRelayAction::Stop,
);
assert_eq!(
select_on_demand_relay_action::(AT_SOURCE, AT_TARGET, AT_TARGET.unwrap(), 100, "test", false),
OnDemandRelayAction::None,
);
}
}