diff --git a/bridges/relays/lib-substrate-relay/src/lib.rs b/bridges/relays/lib-substrate-relay/src/lib.rs index 28c538b309a0be6d649ac52a8088d9c57916c3b4..37a4d602e598dcb9ceeeffe906d2b2ce46776851 100644 --- a/bridges/relays/lib-substrate-relay/src/lib.rs +++ b/bridges/relays/lib-substrate-relay/src/lib.rs @@ -91,18 +91,21 @@ impl<AccountId> TaggedAccount<AccountId> { } /// Batch call builder. -pub trait BatchCallBuilder<Call>: Send { +pub trait BatchCallBuilder<Call>: Clone + Send { /// Create batch call from given calls vector. fn build_batch_call(&self, _calls: Vec<Call>) -> Call; } /// Batch call builder constructor. -pub trait BatchCallBuilderConstructor<Call> { +pub trait BatchCallBuilderConstructor<Call>: Clone { + /// Call builder, used by this constructor. + type CallBuilder: BatchCallBuilder<Call>; /// Create a new instance of a batch call builder. - fn new_builder() -> Option<Box<dyn BatchCallBuilder<Call>>>; + fn new_builder() -> Option<Self::CallBuilder>; } /// Batch call builder based on `pallet-utility`. +#[derive(Clone)] pub struct UtilityPalletBatchCallBuilder<C: Chain>(PhantomData<C>); impl<C: Chain> BatchCallBuilder<C::Call> for UtilityPalletBatchCallBuilder<C> @@ -118,14 +121,25 @@ impl<C: Chain> BatchCallBuilderConstructor<C::Call> for UtilityPalletBatchCallBu where C: ChainWithUtilityPallet, { - fn new_builder() -> Option<Box<dyn BatchCallBuilder<C::Call>>> { - Some(Box::new(Self(Default::default()))) + type CallBuilder = Self; + + fn new_builder() -> Option<Self::CallBuilder> { + Some(Self(Default::default())) } } -/// A `BatchCallBuilderConstructor` that always returns `None`. +// A `BatchCallBuilderConstructor` that always returns `None`. impl<Call> BatchCallBuilderConstructor<Call> for () { - fn new_builder() -> Option<Box<dyn BatchCallBuilder<Call>>> { + type CallBuilder = (); + fn new_builder() -> Option<Self::CallBuilder> { None } } + +// Dummy `BatchCallBuilder` implementation that must never be used outside +// of the `impl BatchCallBuilderConstructor for ()` code. +impl<Call> BatchCallBuilder<Call> for () { + fn build_batch_call(&self, _calls: Vec<Call>) -> Call { + unreachable!("never called, because ()::new_builder() returns None; qed") + } +} diff --git a/bridges/relays/lib-substrate-relay/src/messages_lane.rs b/bridges/relays/lib-substrate-relay/src/messages_lane.rs index 0a7a3566d20f1671b7ac438c89d6a94198ab6635..b86a2629b07515e7e47a93303ff7c48cd6f4aecf 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_lane.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_lane.rs @@ -111,8 +111,9 @@ pub struct MessagesRelayParams<P: SubstrateMessageLane> { /// Batch transaction that brings headers + and messages delivery/receiving confirmations to the /// source node. +#[derive(Clone)] pub struct BatchProofTransaction<SC: Chain, TC: Chain, B: BatchCallBuilderConstructor<CallOf<SC>>> { - builder: Box<dyn BatchCallBuilder<CallOf<SC>>>, + builder: B::CallBuilder, proved_header: HeaderIdOf<TC>, prove_calls: Vec<CallOf<SC>>, @@ -120,6 +121,16 @@ pub struct BatchProofTransaction<SC: Chain, TC: Chain, B: BatchCallBuilderConstr _phantom: PhantomData<fn() -> B>, } +impl<SC: Chain, TC: Chain, B: BatchCallBuilderConstructor<CallOf<SC>>> std::fmt::Debug + for BatchProofTransaction<SC, TC, B> +{ + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.debug_struct("BatchProofTransaction") + .field("proved_header", &self.proved_header) + .finish() + } +} + impl<SC: Chain, TC: Chain, B: BatchCallBuilderConstructor<CallOf<SC>>> BatchProofTransaction<SC, TC, B> { diff --git a/bridges/relays/messages/src/message_lane_loop.rs b/bridges/relays/messages/src/message_lane_loop.rs index 0533e51d5db47bacfb9fed3ded48203d25cbab10..ba86f05ffd32a251a0b1f187a35026f8ad4d712a 100644 --- a/bridges/relays/messages/src/message_lane_loop.rs +++ b/bridges/relays/messages/src/message_lane_loop.rs @@ -111,7 +111,7 @@ pub struct NoncesSubmitArtifacts<T> { /// Batch transaction that already submit some headers and needs to be extended with /// messages/delivery proof before sending. -pub trait BatchTransaction<HeaderId>: Send { +pub trait BatchTransaction<HeaderId>: Debug + Send { /// Header that was required in the original call and which is bundled within this /// batch transaction. fn required_header_id(&self) -> HeaderId; @@ -121,7 +121,7 @@ pub trait BatchTransaction<HeaderId>: Send { #[async_trait] pub trait SourceClient<P: MessageLane>: RelayClient { /// Type of batch transaction that submits finality and message receiving proof. - type BatchTransaction: BatchTransaction<TargetHeaderIdOf<P>>; + type BatchTransaction: BatchTransaction<TargetHeaderIdOf<P>> + Clone; /// Transaction tracker to track submitted transactions. type TransactionTracker: TransactionTracker<HeaderId = SourceHeaderIdOf<P>>; @@ -186,7 +186,7 @@ pub trait SourceClient<P: MessageLane>: RelayClient { #[async_trait] pub trait TargetClient<P: MessageLane>: RelayClient { /// Type of batch transaction that submits finality and messages proof. - type BatchTransaction: BatchTransaction<SourceHeaderIdOf<P>>; + type BatchTransaction: BatchTransaction<SourceHeaderIdOf<P>> + Clone; /// Transaction tracker to track submitted transactions. type TransactionTracker: TransactionTracker<HeaderId = TargetHeaderIdOf<P>>; @@ -1212,6 +1212,9 @@ pub(crate) mod tests { original_data, Arc::new(|_| {}), Arc::new(move |data: &mut TestClientData| { + data.source_state.best_self = + HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.1 + 1); + data.source_state.best_finalized_self = data.source_state.best_self; if let Some(target_to_source_header_required) = data.target_to_source_header_required.take() { @@ -1223,6 +1226,10 @@ pub(crate) mod tests { }), Arc::new(|_| {}), Arc::new(move |data: &mut TestClientData| { + data.target_state.best_self = + HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1); + data.target_state.best_finalized_self = data.target_state.best_self; + if let Some(source_to_target_header_required) = data.source_to_target_header_required.take() { diff --git a/bridges/relays/messages/src/message_race_delivery.rs b/bridges/relays/messages/src/message_race_delivery.rs index b50e6c0841ead9a212cd5de37df0b7ce4f369855..7a245858b32d8fdb15110f2d5c78c2bb25520667 100644 --- a/bridges/relays/messages/src/message_race_delivery.rs +++ b/bridges/relays/messages/src/message_race_delivery.rs @@ -322,13 +322,19 @@ where self.strategy.is_empty() } - fn required_source_header_at_target( + fn required_source_header_at_target<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>( &self, current_best: &SourceHeaderIdOf<P>, + race_state: RS, ) -> Option<SourceHeaderIdOf<P>> { + // we have already submitted something - let's wait until it is mined + if race_state.nonces_submitted().is_some() { + return None + } + let has_nonces_to_deliver = !self.strategy.is_empty(); let header_required_for_messages_delivery = - self.strategy.required_source_header_at_target(current_best); + self.strategy.required_source_header_at_target(current_best, race_state); let header_required_for_reward_confirmations_delivery = self .latest_confirmed_nonces_at_source .back() @@ -381,10 +387,10 @@ where self.strategy.source_nonces_updated(at_block, nonces) } - fn best_target_nonces_updated( + fn best_target_nonces_updated<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>( &mut self, nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>, - race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>, + race_state: &mut RS, ) { // best target nonces must always be ge than finalized target nonces let latest_nonce = nonces.latest_nonce; @@ -396,13 +402,13 @@ where ) } - fn finalized_target_nonces_updated( + fn finalized_target_nonces_updated<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>( &mut self, nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>, - race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>, + race_state: &mut RS, ) { if let Some(ref best_finalized_source_header_id_at_best_target) = - race_state.best_finalized_source_header_id_at_best_target + race_state.best_finalized_source_header_id_at_best_target() { let oldest_header_number_to_keep = best_finalized_source_header_id_at_best_target.0; while self @@ -426,13 +432,13 @@ where ) } - async fn select_nonces_to_deliver( + async fn select_nonces_to_deliver<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>( &self, - race_state: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>, + race_state: RS, ) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> { let best_target_nonce = self.strategy.best_at_target()?; let best_finalized_source_header_id_at_best_target = - race_state.best_finalized_source_header_id_at_best_target.clone()?; + race_state.best_finalized_source_header_id_at_best_target()?; let latest_confirmed_nonce_at_source = self .latest_confirmed_nonces_at_source .iter() @@ -576,12 +582,16 @@ impl<SourceChainBalance: std::fmt::Debug> NoncesRange for MessageDetailsMap<Sour #[cfg(test)] mod tests { - use crate::message_lane_loop::{ - tests::{ - header_id, TestMessageLane, TestMessagesProof, TestSourceChainBalance, - TestSourceClient, TestSourceHeaderId, TestTargetClient, TestTargetHeaderId, + use crate::{ + message_lane_loop::{ + tests::{ + header_id, TestMessageLane, TestMessagesBatchTransaction, TestMessagesProof, + TestSourceChainBalance, TestSourceClient, TestSourceHeaderId, TestTargetClient, + TestTargetHeaderId, + }, + MessageDetails, }, - MessageDetails, + message_race_loop::RaceStateImpl, }; use super::*; @@ -589,7 +599,12 @@ mod tests { const DEFAULT_DISPATCH_WEIGHT: Weight = Weight::from_parts(1, 0); const DEFAULT_SIZE: u32 = 1; - type TestRaceState = RaceState<TestSourceHeaderId, TestTargetHeaderId, TestMessagesProof>; + type TestRaceState = RaceStateImpl< + TestSourceHeaderId, + TestTargetHeaderId, + TestMessagesProof, + TestMessagesBatchTransaction, + >; type TestStrategy = MessageDeliveryStrategy<TestMessageLane, TestSourceClient, TestTargetClient>; @@ -617,12 +632,13 @@ mod tests { } fn prepare_strategy() -> (TestRaceState, TestStrategy) { - let mut race_state = RaceState { + let mut race_state = RaceStateImpl { best_finalized_source_header_id_at_source: Some(header_id(1)), best_finalized_source_header_id_at_best_target: Some(header_id(1)), best_target_header_id: Some(header_id(1)), best_finalized_target_header_id: Some(header_id(1)), nonces_to_submit: None, + nonces_to_submit_batch: None, nonces_submitted: None, }; @@ -964,14 +980,17 @@ mod tests { ); // nothing needs to be delivered now and we don't need any new headers assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); - assert_eq!(strategy.required_source_header_at_target(&header_id(1)), None); + assert_eq!(strategy.required_source_header_at_target(&header_id(1), state.clone()), None); // now let's generate two more nonces [24; 25] at the soruce; strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 19, 0)); // // - so now we'll need to relay source block#2 to be able to accept messages [24; 25]. assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); - assert_eq!(strategy.required_source_header_at_target(&header_id(1)), Some(header_id(2))); + assert_eq!( + strategy.required_source_header_at_target(&header_id(1), state.clone()), + Some(header_id(2)) + ); // let's relay source block#2 state.best_finalized_source_header_id_at_source = Some(header_id(2)); @@ -982,7 +1001,7 @@ mod tests { // and ask strategy again => still nothing to deliver, because parallel confirmations // race need to be pushed further assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); - assert_eq!(strategy.required_source_header_at_target(&header_id(2)), None); + assert_eq!(strategy.required_source_header_at_target(&header_id(2), state.clone()), None); // let's confirm messages [20; 23] strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 23, 0)); @@ -990,10 +1009,10 @@ mod tests { // and ask strategy again => now we have everything required to deliver remaining // [24; 25] nonces and proof of [20; 23] confirmation assert_eq!( - strategy.select_nonces_to_deliver(state).await, + strategy.select_nonces_to_deliver(state.clone()).await, Some(((24..=25), proof_parameters(true, 2))), ); - assert_eq!(strategy.required_source_header_at_target(&header_id(2)), None); + assert_eq!(strategy.required_source_header_at_target(&header_id(2), state), None); } #[async_std::test] @@ -1025,6 +1044,7 @@ mod tests { #[test] #[allow(clippy::reversed_empty_ranges)] fn no_source_headers_required_at_target_if_lanes_are_empty() { + let (state, _) = prepare_strategy(); let mut strategy = TestStrategy { max_unrewarded_relayer_entries_at_target: 4, max_unconfirmed_nonces_at_target: 4, @@ -1053,7 +1073,7 @@ mod tests { strategy.latest_confirmed_nonces_at_source, VecDeque::from([(source_header_id, 0)]) ); - assert_eq!(strategy.required_source_header_at_target(&source_header_id), None); + assert_eq!(strategy.required_source_header_at_target(&source_header_id, state), None); } #[async_std::test] diff --git a/bridges/relays/messages/src/message_race_loop.rs b/bridges/relays/messages/src/message_race_loop.rs index 50f71ea050bb5145ffd170d7219f428e73edec54..7e3f84dd5d1195d7e7e42f13d204f97a5144f5b1 100644 --- a/bridges/relays/messages/src/message_race_loop.rs +++ b/bridges/relays/messages/src/message_race_loop.rs @@ -25,7 +25,7 @@ use crate::message_lane_loop::{BatchTransaction, ClientState, NoncesSubmitArtifa use async_trait::async_trait; use bp_messages::MessageNonce; use futures::{ - future::FutureExt, + future::{FutureExt, TryFutureExt}, stream::{FusedStream, StreamExt}, }; use relay_utils::{ @@ -41,14 +41,14 @@ use std::{ /// One of races within lane. pub trait MessageRace { /// Header id of the race source. - type SourceHeaderId: Debug + Clone + PartialEq; + type SourceHeaderId: Debug + Clone + PartialEq + Send; /// Header id of the race source. - type TargetHeaderId: Debug + Clone + PartialEq; + type TargetHeaderId: Debug + Clone + PartialEq + Send; /// Message nonce used in the race. type MessageNonce: Debug + Clone; /// Proof that is generated and delivered in this race. - type Proof: Debug + Clone; + type Proof: Debug + Clone + Send; /// Name of the race source. fn source_name() -> String; @@ -128,7 +128,7 @@ pub trait TargetClient<P: MessageRace> { /// Type of the additional data from the target client, used by the race. type TargetNoncesData: std::fmt::Debug; /// Type of batch transaction that submits finality and proof to the target node. - type BatchTransaction: BatchTransaction<P::SourceHeaderId>; + type BatchTransaction: BatchTransaction<P::SourceHeaderId> + Clone; /// Transaction tracker to track submitted transactions. type TransactionTracker: TransactionTracker<HeaderId = P::TargetHeaderId>; @@ -175,9 +175,10 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug { /// Should return true if nothing has to be synced. fn is_empty(&self) -> bool; /// Return id of source header that is required to be on target to continue synchronization. - fn required_source_header_at_target( + fn required_source_header_at_target<RS: RaceState<SourceHeaderId, TargetHeaderId>>( &self, current_best: &SourceHeaderId, + race_state: RS, ) -> Option<SourceHeaderId>; /// Return the best nonce at source node. /// @@ -196,29 +197,53 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug { nonces: SourceClientNonces<Self::SourceNoncesRange>, ); /// Called when best nonces are updated at target node of the race. - fn best_target_nonces_updated( + fn best_target_nonces_updated<RS: RaceState<SourceHeaderId, TargetHeaderId>>( &mut self, nonces: TargetClientNonces<Self::TargetNoncesData>, - race_state: &mut RaceState<SourceHeaderId, TargetHeaderId, Proof>, + race_state: &mut RS, ); /// Called when finalized nonces are updated at target node of the race. - fn finalized_target_nonces_updated( + fn finalized_target_nonces_updated<RS: RaceState<SourceHeaderId, TargetHeaderId>>( &mut self, nonces: TargetClientNonces<Self::TargetNoncesData>, - race_state: &mut RaceState<SourceHeaderId, TargetHeaderId, Proof>, + race_state: &mut RS, ); /// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated /// data) from source to target node. /// Additionally, parameters required to generate proof are returned. - async fn select_nonces_to_deliver( + async fn select_nonces_to_deliver<RS: RaceState<SourceHeaderId, TargetHeaderId>>( &self, - race_state: RaceState<SourceHeaderId, TargetHeaderId, Proof>, + race_state: RS, ) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)>; } /// State of the race. +pub trait RaceState<SourceHeaderId, TargetHeaderId>: Send { + /// Best finalized source header id at the source client. + fn best_finalized_source_header_id_at_source(&self) -> Option<SourceHeaderId>; + /// Best finalized source header id at the best block on the target + /// client (at the `best_finalized_source_header_id_at_best_target`). + fn best_finalized_source_header_id_at_best_target(&self) -> Option<SourceHeaderId>; + /// The best header id at the target client. + fn best_target_header_id(&self) -> Option<TargetHeaderId>; + /// Best finalized header id at the target client. + fn best_finalized_target_header_id(&self) -> Option<TargetHeaderId>; + + /// Returns `true` if we have selected nonces to submit to the target node. + fn nonces_to_submit(&self) -> Option<RangeInclusive<MessageNonce>>; + /// Reset our nonces selection. + fn reset_nonces_to_submit(&mut self); + + /// Returns `true` if we have submitted some nonces to the target node and are + /// waiting for them to appear there. + fn nonces_submitted(&self) -> Option<RangeInclusive<MessageNonce>>; + /// Reset our nonces submission. + fn reset_nonces_submitted(&mut self); +} + +/// State of the race and prepared batch transaction (if available). #[derive(Debug, Clone)] -pub struct RaceState<SourceHeaderId, TargetHeaderId, Proof> { +pub(crate) struct RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> { /// Best finalized source header id at the source client. pub best_finalized_source_header_id_at_source: Option<SourceHeaderId>, /// Best finalized source header id at the best block on the target @@ -230,13 +255,67 @@ pub struct RaceState<SourceHeaderId, TargetHeaderId, Proof> { pub best_finalized_target_header_id: Option<TargetHeaderId>, /// Range of nonces that we have selected to submit. pub nonces_to_submit: Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Proof)>, + /// Batch transaction ready to include and deliver selected `nonces_to_submit` from the + /// `state`. + pub nonces_to_submit_batch: Option<BatchTx>, /// Range of nonces that is currently submitted. pub nonces_submitted: Option<RangeInclusive<MessageNonce>>, } -impl<SourceHeaderId, TargetHeaderId, Proof> RaceState<SourceHeaderId, TargetHeaderId, Proof> { - /// Reset `nonces_submitted` to `None`. - fn reset_submitted(&mut self) { +impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> Default + for RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> +{ + fn default() -> Self { + RaceStateImpl { + best_finalized_source_header_id_at_source: None, + best_finalized_source_header_id_at_best_target: None, + best_target_header_id: None, + best_finalized_target_header_id: None, + nonces_to_submit: None, + nonces_to_submit_batch: None, + nonces_submitted: None, + } + } +} + +impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> RaceState<SourceHeaderId, TargetHeaderId> + for RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> +where + SourceHeaderId: Clone + Send, + TargetHeaderId: Clone + Send, + Proof: Clone + Send, + BatchTx: Clone + Send, +{ + fn best_finalized_source_header_id_at_source(&self) -> Option<SourceHeaderId> { + self.best_finalized_source_header_id_at_source.clone() + } + + fn best_finalized_source_header_id_at_best_target(&self) -> Option<SourceHeaderId> { + self.best_finalized_source_header_id_at_best_target.clone() + } + + fn best_target_header_id(&self) -> Option<TargetHeaderId> { + self.best_target_header_id.clone() + } + + fn best_finalized_target_header_id(&self) -> Option<TargetHeaderId> { + self.best_finalized_target_header_id.clone() + } + + fn nonces_to_submit(&self) -> Option<RangeInclusive<MessageNonce>> { + self.nonces_to_submit.as_ref().map(|(_, nonces, _)| nonces.clone()) + } + + fn reset_nonces_to_submit(&mut self) { + self.nonces_to_submit = None; + self.nonces_to_submit_batch = None; + } + + fn nonces_submitted(&self) -> Option<RangeInclusive<MessageNonce>> { + self.nonces_submitted.clone() + } + + fn reset_nonces_submitted(&mut self) { self.nonces_submitted = None; } } @@ -257,7 +336,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( >, ) -> Result<(), FailedClient> { let mut progress_context = Instant::now(); - let mut race_state = RaceState::default(); + let mut race_state = RaceStateImpl::default(); let mut source_retry_backoff = retry_backoff(); let mut source_client_is_online = true; @@ -302,7 +381,8 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( != Some(&source_state.best_finalized_self); if is_source_state_updated { source_nonces_required = true; - race_state.best_finalized_source_header_id_at_source = Some(source_state.best_finalized_self); + race_state.best_finalized_source_header_id_at_source + = Some(source_state.best_finalized_self); } } }, @@ -353,7 +433,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( source_required_header = race_state .best_finalized_source_header_id_at_best_target .as_ref() - .and_then(|best| strategy.required_source_header_at_target(best)); + .and_then(|best| strategy.required_source_header_at_target(best, race_state.clone())); }, nonces = target_best_nonces => { target_best_nonces_required = false; @@ -408,10 +488,13 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( |maybe_batch_transaction: Option<TC::BatchTransaction>| { log::debug!( target: "bridge", - "Target {} client has been asked for more {} headers. Batch tx: {:?}", + "Target {} client has been asked for more {} headers. Batch tx: {}", P::target_name(), P::source_name(), - maybe_batch_transaction.is_some(), + maybe_batch_transaction + .as_ref() + .map(|bt| format!("yes ({:?})", bt.required_header_id())) + .unwrap_or_else(|| "no".into()), ); target_batch_transaction = maybe_batch_transaction; @@ -425,7 +508,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( source_client_is_online = process_future_result( proof, &mut source_retry_backoff, - |(at_block, nonces_range, proof)| { + |(at_block, nonces_range, proof, batch_transaction)| { log::debug!( target: "bridge", "Received proof for nonces in range {:?} from {}", @@ -434,6 +517,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( ); race_state.nonces_to_submit = Some((at_block, nonces_range, proof)); + race_state.nonces_to_submit_batch = batch_transaction; }, &mut source_go_offline_future, async_std::task::sleep, @@ -452,8 +536,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( P::target_name(), ); - target_batch_transaction = None; - race_state.nonces_to_submit = None; + race_state.reset_nonces_to_submit(); race_state.nonces_submitted = Some(artifacts.nonces); target_tx_tracker.set(artifacts.tx_tracker.wait().fuse()); }, @@ -490,7 +573,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( e, ); - race_state.reset_submitted(); + race_state.reset_nonces_submitted(); }); }, (TrackedTransactionStatus::Lost, _) => { @@ -503,7 +586,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( strategy, ); - race_state.reset_submitted(); + race_state.reset_nonces_submitted(); }, _ => (), } @@ -524,6 +607,12 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( source_client_is_online = false; // if we've started to submit batch transaction, let's prioritize it + // + // we're using `take` here, because we don't need batch transaction (i.e. some + // underlying finality proof) anymore for our future calls - we were unable to + // use it for our current state, so why would we need to keep an obsolete proof + // for the future? + let target_batch_transaction = target_batch_transaction.take(); let expected_race_state = if let Some(ref target_batch_transaction) = target_batch_transaction { // when selecting nonces for the batch transaction, we assume that the required @@ -551,7 +640,12 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( ); source_generate_proof.set( - race_source.generate_proof(at_block, nonces_range, proof_parameters).fuse(), + race_source + .generate_proof(at_block, nonces_range, proof_parameters) + .and_then(|(at_source_block, nonces, proof)| async { + Ok((at_source_block, nonces, proof, target_batch_transaction)) + }) + .fuse(), ); } else if source_nonces_required && best_at_source.is_some() { log::debug!(target: "bridge", "Asking {} about message nonces", P::source_name()); @@ -582,7 +676,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( "Going to submit proof of messages in range {:?} to {} node{}", nonces_range, P::target_name(), - target_batch_transaction.as_ref().map(|tx| format!( + race_state.nonces_to_submit_batch.as_ref().map(|tx| format!( ". This transaction is batched with sending the proof for header {:?}.", tx.required_header_id()) ).unwrap_or_default(), @@ -591,7 +685,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( target_submit_proof.set( race_target .submit_proof( - target_batch_transaction.take(), + race_state.nonces_to_submit_batch.clone(), at_block.clone(), nonces_range.clone(), proof.clone(), @@ -628,21 +722,6 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( } } -impl<SourceHeaderId, TargetHeaderId, Proof> Default - for RaceState<SourceHeaderId, TargetHeaderId, Proof> -{ - fn default() -> Self { - RaceState { - best_finalized_source_header_id_at_source: None, - best_finalized_source_header_id_at_best_target: None, - best_target_header_id: None, - best_finalized_target_header_id: None, - nonces_to_submit: None, - nonces_submitted: None, - } - } -} - /// Print race progress. fn print_race_progress<P, S>(prev_time: Instant, strategy: &S) -> Instant where @@ -670,7 +749,7 @@ where } async fn select_nonces_to_deliver<SourceHeaderId, TargetHeaderId, Proof, Strategy>( - race_state: RaceState<SourceHeaderId, TargetHeaderId, Proof>, + race_state: impl RaceState<SourceHeaderId, TargetHeaderId>, strategy: &Strategy, ) -> Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Strategy::ProofParameters)> where @@ -678,7 +757,7 @@ where Strategy: RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>, { let best_finalized_source_header_id_at_best_target = - race_state.best_finalized_source_header_id_at_best_target.clone()?; + race_state.best_finalized_source_header_id_at_best_target()?; strategy .select_nonces_to_deliver(race_state) .await @@ -701,7 +780,7 @@ mod tests { // target node only knows about source' BEST_AT_TARGET block // source node has BEST_AT_SOURCE > BEST_AT_TARGET block - let mut race_state = RaceState::<_, _, ()> { + let mut race_state = RaceStateImpl::<_, _, (), ()> { best_finalized_source_header_id_at_source: Some(HeaderId( BEST_AT_SOURCE, BEST_AT_SOURCE, @@ -713,11 +792,12 @@ mod tests { best_target_header_id: Some(HeaderId(0, 0)), best_finalized_target_header_id: Some(HeaderId(0, 0)), nonces_to_submit: None, + nonces_to_submit_batch: None, nonces_submitted: None, }; // we have some nonces to deliver and they're generated at GENERATED_AT < BEST_AT_SOURCE - let mut strategy = BasicStrategy::new(); + let mut strategy = BasicStrategy::<_, _, _, _, _, ()>::new(); strategy.source_nonces_updated( HeaderId(GENERATED_AT, GENERATED_AT), SourceClientNonces { new_nonces: 0..=10, confirmed_nonce: None }, diff --git a/bridges/relays/messages/src/message_race_strategy.rs b/bridges/relays/messages/src/message_race_strategy.rs index 479ffe5132907a71f389b2b3a4f03422787024cb..e6016448c95cc0a2579d89081b9235e42f2ee0c1 100644 --- a/bridges/relays/messages/src/message_race_strategy.rs +++ b/bridges/relays/messages/src/message_race_strategy.rs @@ -106,24 +106,25 @@ impl< /// at source blocks that are known to be finalized at the target node. /// /// Returns `None` if no entries may be delivered. - pub fn available_source_queue_indices( - &self, - race_state: RaceState< + pub fn available_source_queue_indices< + RS: RaceState< HeaderId<SourceHeaderHash, SourceHeaderNumber>, HeaderId<TargetHeaderHash, TargetHeaderNumber>, - Proof, >, + >( + &self, + race_state: RS, ) -> Option<RangeInclusive<usize>> { // if we do not know best nonce at target node, we can't select anything let best_target_nonce = self.best_target_nonce?; // if we have already selected nonces that we want to submit, do nothing - if race_state.nonces_to_submit.is_some() { + if race_state.nonces_to_submit().is_some() { return None } // if we already submitted some nonces, do nothing - if race_state.nonces_submitted.is_some() { + if race_state.nonces_submitted().is_some() { return None } @@ -143,7 +144,7 @@ impl< // // => let's first select range of entries inside deque that are already finalized at // the target client and pass this range to the selector - let best_header_at_target = race_state.best_finalized_source_header_id_at_best_target?; + let best_header_at_target = race_state.best_finalized_source_header_id_at_best_target()?; let end_index = self .source_queue .iter() @@ -204,9 +205,15 @@ impl< self.source_queue.is_empty() } - fn required_source_header_at_target( + fn required_source_header_at_target< + RS: RaceState< + HeaderId<SourceHeaderHash, SourceHeaderNumber>, + HeaderId<TargetHeaderHash, TargetHeaderNumber>, + >, + >( &self, current_best: &HeaderId<SourceHeaderHash, SourceHeaderNumber>, + _race_state: RS, ) -> Option<HeaderId<SourceHeaderHash, SourceHeaderNumber>> { self.source_queue .back() @@ -247,46 +254,46 @@ impl< ) } - fn best_target_nonces_updated( - &mut self, - nonces: TargetClientNonces<()>, - race_state: &mut RaceState< + fn best_target_nonces_updated< + RS: RaceState< HeaderId<SourceHeaderHash, SourceHeaderNumber>, HeaderId<TargetHeaderHash, TargetHeaderNumber>, - Proof, >, + >( + &mut self, + nonces: TargetClientNonces<()>, + race_state: &mut RS, ) { let nonce = nonces.latest_nonce; let need_to_select_new_nonces = race_state - .nonces_to_submit - .as_ref() - .map(|(_, nonces, _)| *nonces.end() <= nonce) + .nonces_to_submit() + .map(|nonces| *nonces.end() <= nonce) .unwrap_or(false); if need_to_select_new_nonces { - race_state.nonces_to_submit = None; + race_state.reset_nonces_to_submit(); } let need_new_nonces_to_submit = race_state - .nonces_submitted - .as_ref() + .nonces_submitted() .map(|nonces| *nonces.end() <= nonce) .unwrap_or(false); if need_new_nonces_to_submit { - race_state.nonces_submitted = None; + race_state.reset_nonces_submitted(); } self.best_target_nonce = Some(nonce); } - fn finalized_target_nonces_updated( - &mut self, - nonces: TargetClientNonces<()>, - _race_state: &mut RaceState< + fn finalized_target_nonces_updated< + RS: RaceState< HeaderId<SourceHeaderHash, SourceHeaderNumber>, HeaderId<TargetHeaderHash, TargetHeaderNumber>, - Proof, >, + >( + &mut self, + nonces: TargetClientNonces<()>, + _race_state: &mut RS, ) { self.remove_le_nonces_from_source_queue(nonces.latest_nonce); self.best_target_nonce = Some(std::cmp::max( @@ -295,13 +302,14 @@ impl< )); } - async fn select_nonces_to_deliver( - &self, - race_state: RaceState< + async fn select_nonces_to_deliver< + RS: RaceState< HeaderId<SourceHeaderHash, SourceHeaderNumber>, HeaderId<TargetHeaderHash, TargetHeaderNumber>, - Proof, >, + >( + &self, + race_state: RS, ) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> { let available_indices = self.available_source_queue_indices(race_state)?; let range_begin = std::cmp::max( @@ -317,15 +325,23 @@ impl< mod tests { use super::*; use crate::{ - message_lane::MessageLane, + message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, message_lane_loop::tests::{ header_id, TestMessageLane, TestMessagesProof, TestSourceHeaderHash, TestSourceHeaderNumber, }, + message_race_loop::RaceStateImpl, }; type SourceNoncesRange = RangeInclusive<MessageNonce>; + type TestRaceStateImpl = RaceStateImpl< + SourceHeaderIdOf<TestMessageLane>, + TargetHeaderIdOf<TestMessageLane>, + TestMessagesProof, + (), + >; + type BasicStrategy<P> = super::BasicStrategy< <P as MessageLane>::SourceHeaderNumber, <P as MessageLane>::SourceHeaderHash, @@ -357,7 +373,7 @@ mod tests { assert_eq!(strategy.best_at_source(), None); strategy.source_nonces_updated(header_id(1), source_nonces(1..=5)); assert_eq!(strategy.best_at_source(), None); - strategy.best_target_nonces_updated(target_nonces(10), &mut Default::default()); + strategy.best_target_nonces_updated(target_nonces(10), &mut TestRaceStateImpl::default()); assert_eq!(strategy.source_queue, vec![(header_id(1), 1..=5)]); assert_eq!(strategy.best_at_source(), Some(10)); } @@ -365,7 +381,7 @@ mod tests { #[test] fn source_nonce_is_never_lower_than_known_target_nonce() { let mut strategy = BasicStrategy::<TestMessageLane>::new(); - strategy.best_target_nonces_updated(target_nonces(10), &mut Default::default()); + strategy.best_target_nonces_updated(target_nonces(10), &mut TestRaceStateImpl::default()); strategy.source_nonces_updated(header_id(1), source_nonces(1..=5)); assert_eq!(strategy.source_queue, vec![]); } @@ -386,15 +402,17 @@ mod tests { strategy.source_nonces_updated(header_id(2), source_nonces(6..=10)); strategy.source_nonces_updated(header_id(3), source_nonces(11..=15)); strategy.source_nonces_updated(header_id(4), source_nonces(16..=20)); - strategy.finalized_target_nonces_updated(target_nonces(15), &mut Default::default()); + strategy + .finalized_target_nonces_updated(target_nonces(15), &mut TestRaceStateImpl::default()); assert_eq!(strategy.source_queue, vec![(header_id(4), 16..=20)]); - strategy.finalized_target_nonces_updated(target_nonces(17), &mut Default::default()); + strategy + .finalized_target_nonces_updated(target_nonces(17), &mut TestRaceStateImpl::default()); assert_eq!(strategy.source_queue, vec![(header_id(4), 18..=20)]); } #[test] fn selected_nonces_are_dropped_on_target_nonce_update() { - let mut state = RaceState::default(); + let mut state = TestRaceStateImpl::default(); let mut strategy = BasicStrategy::<TestMessageLane>::new(); state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None))); strategy.best_target_nonces_updated(target_nonces(7), &mut state); @@ -405,7 +423,7 @@ mod tests { #[test] fn submitted_nonces_are_dropped_on_target_nonce_update() { - let mut state = RaceState::default(); + let mut state = TestRaceStateImpl::default(); let mut strategy = BasicStrategy::<TestMessageLane>::new(); state.nonces_submitted = Some(5..=10); strategy.best_target_nonces_updated(target_nonces(7), &mut state); @@ -416,7 +434,7 @@ mod tests { #[async_std::test] async fn nothing_is_selected_if_something_is_already_selected() { - let mut state = RaceState::default(); + let mut state = TestRaceStateImpl::default(); let mut strategy = BasicStrategy::<TestMessageLane>::new(); state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None))); strategy.best_target_nonces_updated(target_nonces(0), &mut state); @@ -426,7 +444,7 @@ mod tests { #[async_std::test] async fn nothing_is_selected_if_something_is_already_submitted() { - let mut state = RaceState::default(); + let mut state = TestRaceStateImpl::default(); let mut strategy = BasicStrategy::<TestMessageLane>::new(); state.nonces_submitted = Some(1..=10); strategy.best_target_nonces_updated(target_nonces(0), &mut state); @@ -436,7 +454,7 @@ mod tests { #[async_std::test] async fn select_nonces_to_deliver_works() { - let mut state = RaceState::<_, _, TestMessagesProof>::default(); + let mut state = TestRaceStateImpl::default(); let mut strategy = BasicStrategy::<TestMessageLane>::new(); strategy.best_target_nonces_updated(target_nonces(0), &mut state); strategy.source_nonces_updated(header_id(1), source_nonces(1..=1)); @@ -457,7 +475,7 @@ mod tests { #[test] fn available_source_queue_indices_works() { - let mut state = RaceState::<_, _, TestMessagesProof>::default(); + let mut state = TestRaceStateImpl::default(); let mut strategy = BasicStrategy::<TestMessageLane>::new(); strategy.best_target_nonces_updated(target_nonces(0), &mut state); strategy.source_nonces_updated(header_id(1), source_nonces(1..=3)); @@ -482,7 +500,7 @@ mod tests { #[test] fn remove_le_nonces_from_source_queue_works() { - let mut state = RaceState::<_, _, TestMessagesProof>::default(); + let mut state = TestRaceStateImpl::default(); let mut strategy = BasicStrategy::<TestMessageLane>::new(); strategy.best_target_nonces_updated(target_nonces(0), &mut state); strategy.source_nonces_updated(header_id(1), source_nonces(1..=3)); @@ -518,12 +536,13 @@ mod tests { let target_header_1 = header_id(1); // we start in perfec sync state - all headers are synced and finalized on both ends - let mut state = RaceState::<_, _, TestMessagesProof> { + let mut state = TestRaceStateImpl { best_finalized_source_header_id_at_source: Some(source_header_1), best_finalized_source_header_id_at_best_target: Some(source_header_1), best_target_header_id: Some(target_header_1), best_finalized_target_header_id: Some(target_header_1), nonces_to_submit: None, + nonces_to_submit_batch: None, nonces_submitted: None, };