, Self::Error>;
/// Get nonce of instance of latest generated message.
async fn latest_generated_nonce(
&self,
id: SourceHeaderIdOf,
) -> Result<(SourceHeaderIdOf
, MessageNonce), Self::Error>;
/// Get nonce of the latest message, which receiving has been confirmed by the target chain.
async fn latest_confirmed_received_nonce(
&self,
id: SourceHeaderIdOf
,
) -> Result<(SourceHeaderIdOf
, MessageNonce), Self::Error>;
/// Returns mapping of message nonces, generated on this client, to their weights.
///
/// Some messages may be missing from returned map, if corresponding messages were pruned at
/// the source chain.
async fn generated_message_details(
&self,
id: SourceHeaderIdOf
,
nonces: RangeInclusive,
) -> Result, Self::Error>;
/// Prove messages in inclusive range [begin; end].
async fn prove_messages(
&self,
id: SourceHeaderIdOf,
nonces: RangeInclusive,
proof_parameters: MessageProofParameters,
) -> Result<(SourceHeaderIdOf, RangeInclusive, P::MessagesProof), Self::Error>;
/// Submit messages receiving proof.
async fn submit_messages_receiving_proof(
&self,
maybe_batch_tx: Option,
generated_at_block: TargetHeaderIdOf,
proof: P::MessagesReceivingProof,
) -> Result;
/// We need given finalized target header on source to continue synchronization.
///
/// We assume that the absence of header `id` has already been checked by caller.
///
/// The client may return `Some(_)`, which means that nothing has happened yet and
/// the caller must generate and append message receiving proof to the batch transaction
/// to actually send it (along with required header) to the node.
///
/// If function has returned `None`, it means that the caller now must wait for the
/// appearance of the target header `id` at the source client.
async fn require_target_header_on_source(
&self,
id: TargetHeaderIdOf,
) -> Result, Self::Error>;
}
/// Target client trait.
#[async_trait]
pub trait TargetClient: RelayClient {
/// Type of batch transaction that submits finality and messages proof.
type BatchTransaction: BatchTransaction>;
/// Transaction tracker to track submitted transactions.
type TransactionTracker: TransactionTracker>;
/// Returns state of the client.
async fn state(&self) -> Result, Self::Error>;
/// Get nonce of latest received message.
async fn latest_received_nonce(
&self,
id: TargetHeaderIdOf,
) -> Result<(TargetHeaderIdOf
, MessageNonce), Self::Error>;
/// Get nonce of the latest confirmed message.
async fn latest_confirmed_received_nonce(
&self,
id: TargetHeaderIdOf
,
) -> Result<(TargetHeaderIdOf
, MessageNonce), Self::Error>;
/// Get state of unrewarded relayers set at the inbound lane.
async fn unrewarded_relayers_state(
&self,
id: TargetHeaderIdOf
,
) -> Result<(TargetHeaderIdOf
, UnrewardedRelayersState), Self::Error>;
/// Prove messages receiving at given block.
async fn prove_messages_receiving(
&self,
id: TargetHeaderIdOf
,
) -> Result<(TargetHeaderIdOf
, P::MessagesReceivingProof), Self::Error>;
/// Submit messages proof.
async fn submit_messages_proof(
&self,
maybe_batch_tx: Option,
generated_at_header: SourceHeaderIdOf,
nonces: RangeInclusive,
proof: P::MessagesProof,
) -> Result, Self::Error>;
/// We need given finalized source header on target to continue synchronization.
///
/// The client may return `Some(_)`, which means that nothing has happened yet and
/// the caller must generate and append messages proof to the batch transaction
/// to actually send it (along with required header) to the node.
///
/// If function has returned `None`, it means that the caller now must wait for the
/// appearance of the source header `id` at the target client.
async fn require_source_header_on_target(
&self,
id: SourceHeaderIdOf,
) -> Result, Self::Error>;
}
/// State of the client.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ClientState {
/// The best header id of this chain.
pub best_self: SelfHeaderId,
/// Best finalized header id of this chain.
pub best_finalized_self: SelfHeaderId,
/// Best finalized header id of the peer chain read at the best block of this chain (at
/// `best_finalized_self`).
pub best_finalized_peer_at_best_self: PeerHeaderId,
/// Header id of the peer chain with the number, matching the
/// `best_finalized_peer_at_best_self`.
pub actual_best_finalized_peer_at_best_self: PeerHeaderId,
}
/// State of source client in one-way message lane.
pub type SourceClientState = ClientState, TargetHeaderIdOf>;
/// State of target client in one-way message lane.
pub type TargetClientState
= ClientState, SourceHeaderIdOf>;
/// Both clients state.
#[derive(Debug, Default)]
pub struct ClientsState {
/// Source client state.
pub source: Option>,
/// Target client state.
pub target: Option>,
}
/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs
/// sync loop.
pub fn metrics_prefix(lane: &LaneId) -> String {
format!("{}_to_{}_MessageLane_{}", P::SOURCE_NAME, P::TARGET_NAME, hex::encode(lane))
}
/// Run message lane service loop.
pub async fn run(
params: Params,
source_client: impl SourceClient,
target_client: impl TargetClient
,
metrics_params: MetricsParams,
exit_signal: impl Future + Send + 'static,
) -> Result<(), relay_utils::Error> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.reconnect_delay(params.reconnect_delay)
.with_metrics(metrics_params)
.loop_metric(MessageLaneLoopMetrics::new(Some(&metrics_prefix::(¶ms.lane)))?)?
.expose()
.await?
.run(metrics_prefix::
(¶ms.lane), move |source_client, target_client, metrics| {
run_until_connection_lost(
params.clone(),
source_client,
target_client,
metrics,
exit_signal.clone(),
)
})
.await
}
/// Run one-way message delivery loop until connection with target or source node is lost, or exit
/// signal is received.
async fn run_until_connection_lost, TC: TargetClient>(
params: Params,
source_client: SC,
target_client: TC,
metrics_msg: Option,
exit_signal: impl Future,
) -> Result<(), FailedClient> {
let mut source_retry_backoff = retry_backoff();
let mut source_client_is_online = false;
let mut source_state_required = true;
let source_state = source_client.state().fuse();
let source_go_offline_future = futures::future::Fuse::terminated();
let source_tick_stream = interval(params.source_tick).fuse();
let mut target_retry_backoff = retry_backoff();
let mut target_client_is_online = false;
let mut target_state_required = true;
let target_state = target_client.state().fuse();
let target_go_offline_future = futures::future::Fuse::terminated();
let target_tick_stream = interval(params.target_tick).fuse();
let (
(delivery_source_state_sender, delivery_source_state_receiver),
(delivery_target_state_sender, delivery_target_state_receiver),
) = (unbounded(), unbounded());
let delivery_race_loop = run_message_delivery_race(
source_client.clone(),
delivery_source_state_receiver,
target_client.clone(),
delivery_target_state_receiver,
metrics_msg.clone(),
params.delivery_params,
)
.fuse();
let (
(receiving_source_state_sender, receiving_source_state_receiver),
(receiving_target_state_sender, receiving_target_state_receiver),
) = (unbounded(), unbounded());
let receiving_race_loop = run_message_receiving_race(
source_client.clone(),
receiving_source_state_receiver,
target_client.clone(),
receiving_target_state_receiver,
metrics_msg.clone(),
)
.fuse();
let exit_signal = exit_signal.fuse();
futures::pin_mut!(
source_state,
source_go_offline_future,
source_tick_stream,
target_state,
target_go_offline_future,
target_tick_stream,
delivery_race_loop,
receiving_race_loop,
exit_signal
);
loop {
futures::select! {
new_source_state = source_state => {
source_state_required = false;
source_client_is_online = process_future_result(
new_source_state,
&mut source_retry_backoff,
|new_source_state| {
log::debug!(
target: "bridge",
"Received state from {} node: {:?}",
P::SOURCE_NAME,
new_source_state,
);
let _ = delivery_source_state_sender.unbounded_send(new_source_state.clone());
let _ = receiving_source_state_sender.unbounded_send(new_source_state.clone());
if let Some(metrics_msg) = metrics_msg.as_ref() {
metrics_msg.update_source_state::(new_source_state);
}
},
&mut source_go_offline_future,
async_std::task::sleep,
|| format!("Error retrieving state from {} node", P::SOURCE_NAME),
).fail_if_connection_error(FailedClient::Source)?;
},
_ = source_go_offline_future => {
source_client_is_online = true;
},
_ = source_tick_stream.next() => {
source_state_required = true;
},
new_target_state = target_state => {
target_state_required = false;
target_client_is_online = process_future_result(
new_target_state,
&mut target_retry_backoff,
|new_target_state| {
log::debug!(
target: "bridge",
"Received state from {} node: {:?}",
P::TARGET_NAME,
new_target_state,
);
let _ = delivery_target_state_sender.unbounded_send(new_target_state.clone());
let _ = receiving_target_state_sender.unbounded_send(new_target_state.clone());
if let Some(metrics_msg) = metrics_msg.as_ref() {
metrics_msg.update_target_state::
(new_target_state);
}
},
&mut target_go_offline_future,
async_std::task::sleep,
|| format!("Error retrieving state from {} node", P::TARGET_NAME),
).fail_if_connection_error(FailedClient::Target)?;
},
_ = target_go_offline_future => {
target_client_is_online = true;
},
_ = target_tick_stream.next() => {
target_state_required = true;
},
delivery_error = delivery_race_loop => {
match delivery_error {
Ok(_) => unreachable!("only ends with error; qed"),
Err(err) => return Err(err),
}
},
receiving_error = receiving_race_loop => {
match receiving_error {
Ok(_) => unreachable!("only ends with error; qed"),
Err(err) => return Err(err),
}
},
() = exit_signal => {
return Ok(());
}
}
if source_client_is_online && source_state_required {
log::debug!(target: "bridge", "Asking {} node about its state", P::SOURCE_NAME);
source_state.set(source_client.state().fuse());
source_client_is_online = false;
}
if target_client_is_online && target_state_required {
log::debug!(target: "bridge", "Asking {} node about its state", P::TARGET_NAME);
target_state.set(target_client.state().fuse());
target_client_is_online = false;
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::sync::Arc;
use futures::stream::StreamExt;
use parking_lot::Mutex;
use relay_utils::{HeaderId, MaybeConnectionError, TrackedTransactionStatus};
use super::*;
pub fn header_id(number: TestSourceHeaderNumber) -> TestSourceHeaderId {
HeaderId(number, number)
}
pub type TestSourceChainBalance = u64;
pub type TestSourceHeaderId = HeaderId;
pub type TestTargetHeaderId = HeaderId;
pub type TestMessagesProof = (RangeInclusive, Option);
pub type TestMessagesReceivingProof = MessageNonce;
pub type TestSourceHeaderNumber = u64;
pub type TestSourceHeaderHash = u64;
pub type TestTargetHeaderNumber = u64;
pub type TestTargetHeaderHash = u64;
#[derive(Debug)]
pub struct TestError;
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
true
}
}
#[derive(Clone)]
pub struct TestMessageLane;
impl MessageLane for TestMessageLane {
const SOURCE_NAME: &'static str = "TestSource";
const TARGET_NAME: &'static str = "TestTarget";
type MessagesProof = TestMessagesProof;
type MessagesReceivingProof = TestMessagesReceivingProof;
type SourceChainBalance = TestSourceChainBalance;
type SourceHeaderNumber = TestSourceHeaderNumber;
type SourceHeaderHash = TestSourceHeaderHash;
type TargetHeaderNumber = TestTargetHeaderNumber;
type TargetHeaderHash = TestTargetHeaderHash;
}
#[derive(Clone, Debug)]
pub struct TestMessagesBatchTransaction {
required_header_id: TestSourceHeaderId,
}
#[async_trait]
impl BatchTransaction for TestMessagesBatchTransaction {
fn required_header_id(&self) -> TestSourceHeaderId {
self.required_header_id
}
}
#[derive(Clone, Debug)]
pub struct TestConfirmationBatchTransaction {
required_header_id: TestTargetHeaderId,
}
#[async_trait]
impl BatchTransaction for TestConfirmationBatchTransaction {
fn required_header_id(&self) -> TestTargetHeaderId {
self.required_header_id
}
}
#[derive(Clone, Debug)]
pub struct TestTransactionTracker(TrackedTransactionStatus);
impl Default for TestTransactionTracker {
fn default() -> TestTransactionTracker {
TestTransactionTracker(TrackedTransactionStatus::Finalized(Default::default()))
}
}
#[async_trait]
impl TransactionTracker for TestTransactionTracker {
type HeaderId = TestTargetHeaderId;
async fn wait(self) -> TrackedTransactionStatus {
self.0
}
}
#[derive(Debug, Clone)]
pub struct TestClientData {
is_source_fails: bool,
is_source_reconnected: bool,
source_state: SourceClientState,
source_latest_generated_nonce: MessageNonce,
source_latest_confirmed_received_nonce: MessageNonce,
source_tracked_transaction_status: TrackedTransactionStatus,
submitted_messages_receiving_proofs: Vec,
is_target_fails: bool,
is_target_reconnected: bool,
target_state: SourceClientState,
target_latest_received_nonce: MessageNonce,
target_latest_confirmed_received_nonce: MessageNonce,
target_tracked_transaction_status: TrackedTransactionStatus,
submitted_messages_proofs: Vec,
target_to_source_batch_transaction: Option,
target_to_source_header_required: Option,
target_to_source_header_requirements: Vec,
source_to_target_batch_transaction: Option,
source_to_target_header_required: Option,
source_to_target_header_requirements: Vec,
}
impl Default for TestClientData {
fn default() -> TestClientData {
TestClientData {
is_source_fails: false,
is_source_reconnected: false,
source_state: Default::default(),
source_latest_generated_nonce: 0,
source_latest_confirmed_received_nonce: 0,
source_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId(
0,
Default::default(),
)),
submitted_messages_receiving_proofs: Vec::new(),
is_target_fails: false,
is_target_reconnected: false,
target_state: Default::default(),
target_latest_received_nonce: 0,
target_latest_confirmed_received_nonce: 0,
target_tracked_transaction_status: TrackedTransactionStatus::Finalized(HeaderId(
0,
Default::default(),
)),
submitted_messages_proofs: Vec::new(),
target_to_source_batch_transaction: None,
target_to_source_header_required: None,
target_to_source_header_requirements: Vec::new(),
source_to_target_batch_transaction: None,
source_to_target_header_required: None,
source_to_target_header_requirements: Vec::new(),
}
}
}
impl TestClientData {
fn receive_messages(&mut self, proof: TestMessagesProof) {
self.target_state.best_self =
HeaderId(self.target_state.best_self.0 + 1, self.target_state.best_self.1 + 1);
self.target_state.best_finalized_self = self.target_state.best_self;
self.target_latest_received_nonce = *proof.0.end();
if let Some(target_latest_confirmed_received_nonce) = proof.1 {
self.target_latest_confirmed_received_nonce =
target_latest_confirmed_received_nonce;
}
self.submitted_messages_proofs.push(proof);
}
fn receive_messages_delivery_proof(&mut self, proof: TestMessagesReceivingProof) {
self.source_state.best_self =
HeaderId(self.source_state.best_self.0 + 1, self.source_state.best_self.1 + 1);
self.source_state.best_finalized_self = self.source_state.best_self;
self.submitted_messages_receiving_proofs.push(proof);
self.source_latest_confirmed_received_nonce = proof;
}
}
#[derive(Clone)]
pub struct TestSourceClient {
data: Arc>,
tick: Arc,
post_tick: Arc,
}
impl Default for TestSourceClient {
fn default() -> Self {
TestSourceClient {
data: Arc::new(Mutex::new(TestClientData::default())),
tick: Arc::new(|_| {}),
post_tick: Arc::new(|_| {}),
}
}
}
#[async_trait]
impl RelayClient for TestSourceClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
{
let mut data = self.data.lock();
(self.tick)(&mut data);
data.is_source_reconnected = true;
(self.post_tick)(&mut data);
}
Ok(())
}
}
#[async_trait]
impl SourceClient for TestSourceClient {
type BatchTransaction = TestConfirmationBatchTransaction;
type TransactionTracker = TestTransactionTracker;
async fn state(&self) -> Result, TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_source_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok(data.source_state.clone())
}
async fn latest_generated_nonce(
&self,
id: SourceHeaderIdOf,
) -> Result<(SourceHeaderIdOf, MessageNonce), TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_source_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok((id, data.source_latest_generated_nonce))
}
async fn latest_confirmed_received_nonce(
&self,
id: SourceHeaderIdOf,
) -> Result<(SourceHeaderIdOf, MessageNonce), TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok((id, data.source_latest_confirmed_received_nonce))
}
async fn generated_message_details(
&self,
_id: SourceHeaderIdOf,
nonces: RangeInclusive,
) -> Result, TestError> {
Ok(nonces
.map(|nonce| {
(
nonce,
MessageDetails {
dispatch_weight: Weight::from_ref_time(1),
size: 1,
reward: 1,
},
)
})
.collect())
}
async fn prove_messages(
&self,
id: SourceHeaderIdOf,
nonces: RangeInclusive,
proof_parameters: MessageProofParameters,
) -> Result<
(SourceHeaderIdOf, RangeInclusive, TestMessagesProof),
TestError,
> {
let mut data = self.data.lock();
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok((
id,
nonces.clone(),
(
nonces,
if proof_parameters.outbound_state_proof_required {
Some(data.source_latest_confirmed_received_nonce)
} else {
None
},
),
))
}
async fn submit_messages_receiving_proof(
&self,
_maybe_batch_tx: Option,
_generated_at_block: TargetHeaderIdOf,
proof: TestMessagesReceivingProof,
) -> Result {
let mut data = self.data.lock();
(self.tick)(&mut data);
data.receive_messages_delivery_proof(proof);
(self.post_tick)(&mut data);
Ok(TestTransactionTracker(data.source_tracked_transaction_status))
}
async fn require_target_header_on_source(
&self,
id: TargetHeaderIdOf,
) -> Result, Self::Error> {
let mut data = self.data.lock();
data.target_to_source_header_required = Some(id);
data.target_to_source_header_requirements.push(id);
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok(data.target_to_source_batch_transaction.take().map(|mut tx| {
tx.required_header_id = id;
tx
}))
}
}
#[derive(Clone)]
pub struct TestTargetClient {
data: Arc>,
tick: Arc,
post_tick: Arc,
}
impl Default for TestTargetClient {
fn default() -> Self {
TestTargetClient {
data: Arc::new(Mutex::new(TestClientData::default())),
tick: Arc::new(|_| {}),
post_tick: Arc::new(|_| {}),
}
}
}
#[async_trait]
impl RelayClient for TestTargetClient {
type Error = TestError;
async fn reconnect(&mut self) -> Result<(), TestError> {
{
let mut data = self.data.lock();
(self.tick)(&mut data);
data.is_target_reconnected = true;
(self.post_tick)(&mut data);
}
Ok(())
}
}
#[async_trait]
impl TargetClient for TestTargetClient {
type BatchTransaction = TestMessagesBatchTransaction;
type TransactionTracker = TestTransactionTracker;
async fn state(&self) -> Result, TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_target_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok(data.target_state.clone())
}
async fn latest_received_nonce(
&self,
id: TargetHeaderIdOf,
) -> Result<(TargetHeaderIdOf, MessageNonce), TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_target_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok((id, data.target_latest_received_nonce))
}
async fn unrewarded_relayers_state(
&self,
id: TargetHeaderIdOf,
) -> Result<(TargetHeaderIdOf, UnrewardedRelayersState), TestError> {
Ok((
id,
UnrewardedRelayersState {
unrewarded_relayer_entries: 0,
messages_in_oldest_entry: 0,
total_messages: 0,
last_delivered_nonce: 0,
},
))
}
async fn latest_confirmed_received_nonce(
&self,
id: TargetHeaderIdOf,
) -> Result<(TargetHeaderIdOf, MessageNonce), TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_target_fails {
return Err(TestError)
}
(self.post_tick)(&mut data);
Ok((id, data.target_latest_confirmed_received_nonce))
}
async fn prove_messages_receiving(
&self,
id: TargetHeaderIdOf,
) -> Result<(TargetHeaderIdOf, TestMessagesReceivingProof), TestError> {
Ok((id, self.data.lock().target_latest_received_nonce))
}
async fn submit_messages_proof(
&self,
_maybe_batch_tx: Option,
_generated_at_header: SourceHeaderIdOf,
nonces: RangeInclusive,
proof: TestMessagesProof,
) -> Result, TestError> {
let mut data = self.data.lock();
(self.tick)(&mut data);
if data.is_target_fails {
return Err(TestError)
}
data.receive_messages(proof);
(self.post_tick)(&mut data);
Ok(NoncesSubmitArtifacts {
nonces,
tx_tracker: TestTransactionTracker(data.target_tracked_transaction_status),
})
}
async fn require_source_header_on_target(
&self,
id: SourceHeaderIdOf,
) -> Result, Self::Error> {
let mut data = self.data.lock();
data.source_to_target_header_required = Some(id);
data.source_to_target_header_requirements.push(id);
(self.tick)(&mut data);
(self.post_tick)(&mut data);
Ok(data.source_to_target_batch_transaction.take().map(|mut tx| {
tx.required_header_id = id;
tx
}))
}
}
fn run_loop_test(
data: Arc>,
source_tick: Arc,
source_post_tick: Arc,
target_tick: Arc,
target_post_tick: Arc,
exit_signal: impl Future + 'static + Send,
) -> TestClientData {
async_std::task::block_on(async {
let source_client = TestSourceClient {
data: data.clone(),
tick: source_tick,
post_tick: source_post_tick,
};
let target_client = TestTargetClient {
data: data.clone(),
tick: target_tick,
post_tick: target_post_tick,
};
let _ = run(
Params {
lane: LaneId([0, 0, 0, 0]),
source_tick: Duration::from_millis(100),
target_tick: Duration::from_millis(100),
reconnect_delay: Duration::from_millis(0),
delivery_params: MessageDeliveryParams {
max_unrewarded_relayer_entries_at_target: 4,
max_unconfirmed_nonces_at_target: 4,
max_messages_in_single_batch: 4,
max_messages_weight_in_single_batch: Weight::from_ref_time(4),
max_messages_size_in_single_batch: 4,
},
},
source_client,
target_client,
MetricsParams::disabled(),
exit_signal,
)
.await;
let result = data.lock().clone();
result
})
}
#[test]
fn message_lane_loop_is_able_to_recover_from_connection_errors() {
// with this configuration, source client will return Err, making source client
// reconnect. Then the target client will fail with Err + reconnect. Then we finally
// able to deliver messages.
let (exit_sender, exit_receiver) = unbounded();
let result = run_loop_test(
Arc::new(Mutex::new(TestClientData {
is_source_fails: true,
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
source_latest_generated_nonce: 1,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
target_latest_received_nonce: 0,
..Default::default()
})),
Arc::new(|data: &mut TestClientData| {
if data.is_source_reconnected {
data.is_source_fails = false;
data.is_target_fails = true;
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if data.is_target_reconnected {
data.is_target_fails = false;
}
if data.target_state.best_finalized_peer_at_best_self.0 < 10 {
data.target_state.best_finalized_peer_at_best_self = HeaderId(
data.target_state.best_finalized_peer_at_best_self.0 + 1,
data.target_state.best_finalized_peer_at_best_self.0 + 1,
);
}
if !data.submitted_messages_proofs.is_empty() {
exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],);
}
#[test]
fn message_lane_loop_is_able_to_recover_from_race_stall() {
// with this configuration, both source and target clients will lose their transactions =>
// reconnect will happen
let (source_exit_sender, exit_receiver) = unbounded();
let target_exit_sender = source_exit_sender.clone();
let result = run_loop_test(
Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
source_latest_generated_nonce: 1,
source_tracked_transaction_status: TrackedTransactionStatus::Lost,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
target_latest_received_nonce: 0,
target_tracked_transaction_status: TrackedTransactionStatus::Lost,
..Default::default()
})),
Arc::new(move |data: &mut TestClientData| {
if data.is_source_reconnected {
data.source_tracked_transaction_status =
TrackedTransactionStatus::Finalized(Default::default());
}
if data.is_source_reconnected && data.is_target_reconnected {
source_exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if data.is_target_reconnected {
data.target_tracked_transaction_status =
TrackedTransactionStatus::Finalized(Default::default());
}
if data.is_source_reconnected && data.is_target_reconnected {
target_exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert!(result.is_source_reconnected);
}
#[test]
fn message_lane_loop_is_able_to_recover_from_unsuccessful_transaction() {
// with this configuration, both source and target clients will mine their transactions, but
// their corresponding nonce won't be udpated => reconnect will happen
let (exit_sender, exit_receiver) = unbounded();
let result = run_loop_test(
Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
source_latest_generated_nonce: 1,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
target_latest_received_nonce: 0,
..Default::default()
})),
Arc::new(move |data: &mut TestClientData| {
// blocks are produced on every tick
data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
// syncing target headers -> source chain
if let Some(last_requirement) = data.target_to_source_header_requirements.last() {
if *last_requirement != data.source_state.best_finalized_peer_at_best_self {
data.source_state.best_finalized_peer_at_best_self = *last_requirement;
}
}
}),
Arc::new(move |data: &mut TestClientData| {
// if it is the first time we're submitting delivery proof, let's revert changes
// to source status => then the delivery confirmation transaction is "finalized",
// but the state is not altered
if data.submitted_messages_receiving_proofs.len() == 1 {
data.source_latest_confirmed_received_nonce = 0;
}
}),
Arc::new(move |data: &mut TestClientData| {
// blocks are produced on every tick
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
// syncing source headers -> target chain
if let Some(last_requirement) = data.source_to_target_header_requirements.last() {
if *last_requirement != data.target_state.best_finalized_peer_at_best_self {
data.target_state.best_finalized_peer_at_best_self = *last_requirement;
}
}
// if source has received all messages receiving confirmations => stop
if data.source_latest_confirmed_received_nonce == 1 {
exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(move |data: &mut TestClientData| {
// if it is the first time we're submitting messages proof, let's revert changes
// to target status => then the messages delivery transaction is "finalized", but
// the state is not altered
if data.submitted_messages_proofs.len() == 1 {
data.target_latest_received_nonce = 0;
data.target_latest_confirmed_received_nonce = 0;
}
}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert!(result.is_source_reconnected);
assert_eq!(result.submitted_messages_proofs.len(), 2);
assert_eq!(result.submitted_messages_receiving_proofs.len(), 2);
}
#[test]
fn message_lane_loop_works() {
let (exit_sender, exit_receiver) = unbounded();
let result = run_loop_test(
Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(10, 10),
best_finalized_self: HeaderId(10, 10),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
source_latest_generated_nonce: 10,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
target_latest_received_nonce: 0,
..Default::default()
})),
Arc::new(|data: &mut TestClientData| {
// blocks are produced on every tick
data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
// headers relay must only be started when we need new target headers at source node
if data.target_to_source_header_required.is_some() {
assert!(
data.source_state.best_finalized_peer_at_best_self.0 <
data.target_state.best_self.0
);
data.target_to_source_header_required = None;
}
// syncing target headers -> source chain
if let Some(last_requirement) = data.target_to_source_header_requirements.last() {
if *last_requirement != data.source_state.best_finalized_peer_at_best_self {
data.source_state.best_finalized_peer_at_best_self = *last_requirement;
}
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
// blocks are produced on every tick
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
// headers relay must only be started when we need new source headers at target node
if data.source_to_target_header_required.is_some() {
assert!(
data.target_state.best_finalized_peer_at_best_self.0 <
data.source_state.best_self.0
);
data.source_to_target_header_required = None;
}
// syncing source headers -> target chain
if let Some(last_requirement) = data.source_to_target_header_requirements.last() {
if *last_requirement != data.target_state.best_finalized_peer_at_best_self {
data.target_state.best_finalized_peer_at_best_self = *last_requirement;
}
}
// if source has received all messages receiving confirmations => stop
if data.source_latest_confirmed_received_nonce == 10 {
exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
exit_receiver.into_future().map(|(_, _)| ()),
);
// there are no strict restrictions on when reward confirmation should come
// (because `max_unconfirmed_nonces_at_target` is `100` in tests and this confirmation
// depends on the state of both clients)
// => we do not check it here
assert_eq!(result.submitted_messages_proofs[0].0, 1..=4);
assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
assert!(!result.submitted_messages_receiving_proofs.is_empty());
// check that we have at least once required new source->target or target->source headers
assert!(!result.target_to_source_header_requirements.is_empty());
assert!(!result.source_to_target_header_requirements.is_empty());
}
#[test]
fn message_lane_loop_works_with_batch_transactions() {
let (exit_sender, exit_receiver) = unbounded();
let original_data = Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(10, 10),
best_finalized_self: HeaderId(10, 10),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
source_latest_generated_nonce: 10,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
actual_best_finalized_peer_at_best_self: HeaderId(0, 0),
},
target_latest_received_nonce: 0,
..Default::default()
}));
let result = run_loop_test(
original_data,
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if let Some(target_to_source_header_required) =
data.target_to_source_header_required.take()
{
data.target_to_source_batch_transaction =
Some(TestConfirmationBatchTransaction {
required_header_id: target_to_source_header_required,
})
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if let Some(source_to_target_header_required) =
data.source_to_target_header_required.take()
{
data.source_to_target_batch_transaction = Some(TestMessagesBatchTransaction {
required_header_id: source_to_target_header_required,
})
}
if data.source_latest_confirmed_received_nonce == 10 {
exit_sender.unbounded_send(()).unwrap();
}
}),
exit_receiver.into_future().map(|(_, _)| ()),
);
// there are no strict restrictions on when reward confirmation should come
// (because `max_unconfirmed_nonces_at_target` is `100` in tests and this confirmation
// depends on the state of both clients)
// => we do not check it here
assert_eq!(result.submitted_messages_proofs[0].0, 1..=4);
assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
assert!(!result.submitted_messages_receiving_proofs.is_empty());
// check that we have at least once required new source->target or target->source headers
assert!(!result.target_to_source_header_requirements.is_empty());
assert!(!result.source_to_target_header_requirements.is_empty());
}
}