diff --git a/bridges/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs b/bridges/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs index 2bca68c37e35d4171342da946db65de223189b27..f132088f546de34bb053caf536f4c254dde3ecbb 100644 --- a/bridges/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs +++ b/bridges/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs @@ -159,7 +159,7 @@ pub async fn run( }; // 2/3 is reserved for proofs and tx overhead - let max_messages_size_in_single_batch = bp_rialto::max_extrinsic_size() as usize / 3; + let max_messages_size_in_single_batch = bp_rialto::max_extrinsic_size() / 3; // TODO: use Millau weights after https://github.com/paritytech/parity-bridges-common/issues/390 let (max_messages_in_single_batch, max_messages_weight_in_single_batch) = select_delivery_transaction_limits::<pallet_bridge_messages::weights::RialtoWeight<millau_runtime::Runtime>>( @@ -193,6 +193,7 @@ pub async fn run( max_messages_in_single_batch, max_messages_weight_in_single_batch, max_messages_size_in_single_batch, + relayer_mode: messages_relay::message_lane_loop::RelayerMode::Altruistic, }, }, MillauSourceClient::new( diff --git a/bridges/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs b/bridges/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs index c85cf9d367886770fc488e35ffe5dca9656788a1..02c6010f74f021dd8f799891f0d719795567901f 100644 --- a/bridges/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs +++ b/bridges/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs @@ -159,7 +159,7 @@ pub async fn run( }; // 2/3 is reserved for proofs and tx overhead - let max_messages_size_in_single_batch = bp_millau::max_extrinsic_size() as usize / 3; + let max_messages_size_in_single_batch = bp_millau::max_extrinsic_size() / 3; let (max_messages_in_single_batch, max_messages_weight_in_single_batch) = select_delivery_transaction_limits::<pallet_bridge_messages::weights::RialtoWeight<rialto_runtime::Runtime>>( bp_millau::max_extrinsic_weight(), @@ -192,6 +192,7 @@ pub async fn run( max_messages_in_single_batch, max_messages_weight_in_single_batch, max_messages_size_in_single_batch, + relayer_mode: messages_relay::message_lane_loop::RelayerMode::Altruistic, }, }, RialtoSourceClient::new( diff --git a/bridges/relays/bin-substrate/src/messages_lane.rs b/bridges/relays/bin-substrate/src/messages_lane.rs index 40de424a78d4a5f6dba49768cd22785360ce4fe2..7468c16706b9142fd6023625bb084e400000b810 100644 --- a/bridges/relays/bin-substrate/src/messages_lane.rs +++ b/bridges/relays/bin-substrate/src/messages_lane.rs @@ -139,6 +139,7 @@ where type MessagesProof = SubstrateMessagesProof<Source>; type MessagesReceivingProof = SubstrateMessagesReceivingProof<Target>; + type SourceChainBalance = Source::Balance; type SourceHeaderNumber = BlockNumberOf<Source>; type SourceHeaderHash = HashOf<Source>; diff --git a/bridges/relays/bin-substrate/src/messages_source.rs b/bridges/relays/bin-substrate/src/messages_source.rs index 68329a2dbcc3d34ecf96ac2a624a5b00713f8559..ce82f8abe5e7c327f2e35c9be91e91e3b2371051 100644 --- a/bridges/relays/bin-substrate/src/messages_source.rs +++ b/bridges/relays/bin-substrate/src/messages_source.rs @@ -30,7 +30,7 @@ use frame_support::{traits::Instance, weights::Weight}; use messages_relay::{ message_lane::{SourceHeaderIdOf, TargetHeaderIdOf}, message_lane_loop::{ - ClientState, MessageProofParameters, MessageWeights, MessageWeightsMap, SourceClient, SourceClientState, + ClientState, MessageDetails, MessageDetailsMap, MessageProofParameters, SourceClient, SourceClientState, }, }; use pallet_bridge_messages::Config as MessagesConfig; @@ -112,6 +112,7 @@ where C::BlockNumber: BlockNumberBase, P: SubstrateMessageLane< MessagesProof = SubstrateMessagesProof<C>, + SourceChainBalance = C::Balance, SourceHeaderNumber = <C::Header as HeaderT>::Number, SourceHeaderHash = <C::Header as HeaderT>::Hash, SourceChain = C, @@ -168,11 +169,11 @@ where Ok((id, latest_received_nonce)) } - async fn generated_messages_weights( + async fn generated_message_details( &self, id: SourceHeaderIdOf<P>, nonces: RangeInclusive<MessageNonce>, - ) -> Result<MessageWeightsMap, SubstrateError> { + ) -> Result<MessageDetailsMap<P::SourceChainBalance>, SubstrateError> { let encoded_response = self .client .state_call( @@ -242,6 +243,10 @@ where target_to_source_headers_relay.require_finalized_header(id).await; } } + + async fn estimate_confirmation_transaction(&self) -> P::SourceChainBalance { + num_traits::Zero::zero() // TODO: https://github.com/paritytech/parity-bridges-common/issues/997 + } } pub async fn read_client_state<SelfChain, BridgedHeaderHash, BridgedHeaderNumber>( @@ -290,7 +295,7 @@ where fn make_message_details_map<C: Chain>( weights: Vec<bp_messages::MessageDetails<C::Balance>>, nonces: RangeInclusive<MessageNonce>, -) -> Result<MessageWeightsMap, SubstrateError> { +) -> Result<MessageDetailsMap<C::Balance>, SubstrateError> { let make_missing_nonce_error = |expected_nonce| { Err(SubstrateError::Custom(format!( "Missing nonce {} in messages_dispatch_weight call result. Expected all nonces from {:?}", @@ -298,7 +303,7 @@ fn make_message_details_map<C: Chain>( ))) }; - let mut weights_map = MessageWeightsMap::new(); + let mut weights_map = MessageDetailsMap::new(); // this is actually prevented by external logic if nonces.is_empty() { @@ -341,9 +346,11 @@ fn make_message_details_map<C: Chain>( weights_map.insert( details.nonce, - MessageWeights { - weight: details.dispatch_weight, + MessageDetails { + dispatch_weight: details.dispatch_weight, size: details.size as _, + // TODO: https://github.com/paritytech/parity-bridges-common/issues/997 + reward: num_traits::Zero::zero(), }, ); expected_nonce = details.nonce + 1; @@ -376,9 +383,30 @@ mod tests { assert_eq!( make_message_details_map::<relay_rialto_client::Rialto>(message_details_from_rpc(1..=3), 1..=3,).unwrap(), vec![ - (1, MessageWeights { weight: 0, size: 0 }), - (2, MessageWeights { weight: 0, size: 0 }), - (3, MessageWeights { weight: 0, size: 0 }), + ( + 1, + MessageDetails { + dispatch_weight: 0, + size: 0, + reward: 0 + } + ), + ( + 2, + MessageDetails { + dispatch_weight: 0, + size: 0, + reward: 0 + } + ), + ( + 3, + MessageDetails { + dispatch_weight: 0, + size: 0, + reward: 0 + } + ), ] .into_iter() .collect(), @@ -390,8 +418,22 @@ mod tests { assert_eq!( make_message_details_map::<relay_rialto_client::Rialto>(message_details_from_rpc(2..=3), 1..=3,).unwrap(), vec![ - (2, MessageWeights { weight: 0, size: 0 }), - (3, MessageWeights { weight: 0, size: 0 }), + ( + 2, + MessageDetails { + dispatch_weight: 0, + size: 0, + reward: 0 + } + ), + ( + 3, + MessageDetails { + dispatch_weight: 0, + size: 0, + reward: 0 + } + ), ] .into_iter() .collect(), diff --git a/bridges/relays/bin-substrate/src/messages_target.rs b/bridges/relays/bin-substrate/src/messages_target.rs index 715efce88ec28b94a6702cde3470899b0d09e589..f9ffb922db88b7bb61e048c73af3501442e83356 100644 --- a/bridges/relays/bin-substrate/src/messages_target.rs +++ b/bridges/relays/bin-substrate/src/messages_target.rs @@ -27,7 +27,7 @@ use bp_messages::{LaneId, MessageNonce, UnrewardedRelayersState}; use bp_runtime::ChainId; use bridge_runtime_common::messages::source::FromBridgedChainMessagesDeliveryProof; use codec::{Decode, Encode}; -use frame_support::traits::Instance; +use frame_support::{traits::Instance, weights::Weight}; use messages_relay::{ message_lane::{SourceHeaderIdOf, TargetHeaderIdOf}, message_lane_loop::{TargetClient, TargetClientState}, @@ -229,4 +229,13 @@ where source_to_target_headers_relay.require_finalized_header(id).await; } } + + async fn estimate_delivery_transaction_in_source_tokens( + &self, + _nonces: RangeInclusive<MessageNonce>, + _total_dispatch_weight: Weight, + _total_size: u32, + ) -> P::SourceChainBalance { + num_traits::Zero::zero() // TODO: https://github.com/paritytech/parity-bridges-common/issues/997 + } } diff --git a/bridges/relays/client-substrate/src/chain.rs b/bridges/relays/client-substrate/src/chain.rs index ba3634fb6fd0ee9015d6ff2e0a08a636af152eec..4cc8a0394d9ad03efee5176dec788dca4897baa8 100644 --- a/bridges/relays/client-substrate/src/chain.rs +++ b/bridges/relays/client-substrate/src/chain.rs @@ -17,7 +17,7 @@ use bp_runtime::Chain as ChainBase; use frame_support::Parameter; use jsonrpsee_ws_client::{DeserializeOwned, Serialize}; -use num_traits::{CheckedSub, Zero}; +use num_traits::{CheckedSub, SaturatingAdd, Zero}; use sp_core::{storage::StorageKey, Pair}; use sp_runtime::{ generic::SignedBlock, @@ -58,7 +58,7 @@ pub trait Chain: ChainBase + Clone { /// /// The chain may suport multiple tokens, but this particular type is for token that is used /// to pay for transaction dispatch, to reward different relayers (headers, messages), etc. - type Balance: Parameter + Member + DeserializeOwned + Clone + Copy + CheckedSub + PartialOrd + Zero; + type Balance: Parameter + Member + DeserializeOwned + Clone + Copy + CheckedSub + PartialOrd + SaturatingAdd + Zero; } /// Substrate-based chain with `frame_system::Config::AccountData` set to diff --git a/bridges/relays/messages/Cargo.toml b/bridges/relays/messages/Cargo.toml index e02f8ccc868280657e6742ed8ca9fea539aed285..943a3b7d9cd8a86f931443fbff5d9754837121bb 100644 --- a/bridges/relays/messages/Cargo.toml +++ b/bridges/relays/messages/Cargo.toml @@ -6,11 +6,12 @@ edition = "2018" license = "GPL-3.0-or-later WITH Classpath-exception-2.0" [dependencies] -async-std = "1.6.5" +async-std = { version = "1.6.5", features = ["attributes"] } async-trait = "0.1.40" futures = "0.3.5" hex = "0.4" log = "0.4.11" +num-traits = "0.2" parking_lot = "0.11.0" # Bridge Dependencies diff --git a/bridges/relays/messages/src/message_lane.rs b/bridges/relays/messages/src/message_lane.rs index 6473ec987500b9b030a033c457f32e5fd65b6752..8757e9322ce4473c230cf34a0e631aac94017d30 100644 --- a/bridges/relays/messages/src/message_lane.rs +++ b/bridges/relays/messages/src/message_lane.rs @@ -19,6 +19,7 @@ //! 1) relay new messages from source to target node; //! 2) relay proof-of-delivery from target to source node. +use num_traits::{SaturatingAdd, Zero}; use relay_utils::{BlockNumberBase, HeaderId}; use std::fmt::Debug; @@ -34,6 +35,12 @@ pub trait MessageLane: 'static + Clone + Send + Sync { /// Messages receiving proof. type MessagesReceivingProof: Clone + Debug + Send + Sync; + /// The type of the source chain token balance, that is used to: + /// + /// 1) pay transaction fees; + /// 2) pay message delivery and dispatch fee; + /// 3) pay relayer rewards. + type SourceChainBalance: Clone + Copy + Debug + PartialOrd + SaturatingAdd + Zero + Send + Sync; /// Number of the source header. type SourceHeaderNumber: BlockNumberBase; /// Hash of the source header. diff --git a/bridges/relays/messages/src/message_lane_loop.rs b/bridges/relays/messages/src/message_lane_loop.rs index af04bf984e1f95ce70e838e17ea3ff343426053c..3ae1cb8a03f62478373f862d93b556d005d84707 100644 --- a/bridges/relays/messages/src/message_lane_loop.rs +++ b/bridges/relays/messages/src/message_lane_loop.rs @@ -58,6 +58,15 @@ pub struct Params { pub delivery_params: MessageDeliveryParams, } +/// Relayer operating mode. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum RelayerMode { + /// The relayer doesn't care about rewards. + Altruistic, + /// The relayer will deliver all messages and confirmations as long as he's not losing any funds. + NoLosses, +} + /// Message delivery race parameters. #[derive(Debug, Clone)] pub struct MessageDeliveryParams { @@ -74,20 +83,24 @@ pub struct MessageDeliveryParams { /// Maximal cumulative dispatch weight of relayed messages in single delivery transaction. pub max_messages_weight_in_single_batch: Weight, /// Maximal cumulative size of relayed messages in single delivery transaction. - pub max_messages_size_in_single_batch: usize, + pub max_messages_size_in_single_batch: u32, + /// Relayer operating mode. + pub relayer_mode: RelayerMode, } -/// Message weights. +/// Message details. #[derive(Debug, Clone, Copy, PartialEq)] -pub struct MessageWeights { +pub struct MessageDetails<SourceChainBalance> { /// Message dispatch weight. - pub weight: Weight, + pub dispatch_weight: Weight, /// Message size (number of bytes in encoded payload). - pub size: usize, + pub size: u32, + /// The relayer reward paid in the source chain tokens. + pub reward: SourceChainBalance, } -/// Messages weights map. -pub type MessageWeightsMap = BTreeMap<MessageNonce, MessageWeights>; +/// Messages details map. +pub type MessageDetailsMap<SourceChainBalance> = BTreeMap<MessageNonce, MessageDetails<SourceChainBalance>>; /// Message delivery race proof parameters. #[derive(Debug, PartialEq)] @@ -117,13 +130,13 @@ pub trait SourceClient<P: MessageLane>: RelayClient { /// Returns mapping of message nonces, generated on this client, to their weights. /// - /// Some weights may be missing from returned map, if corresponding messages were pruned at + /// Some messages may be missing from returned map, if corresponding messages were pruned at /// the source chain. - async fn generated_messages_weights( + async fn generated_message_details( &self, id: SourceHeaderIdOf<P>, nonces: RangeInclusive<MessageNonce>, - ) -> Result<MessageWeightsMap, Self::Error>; + ) -> Result<MessageDetailsMap<P::SourceChainBalance>, Self::Error>; /// Prove messages in inclusive range [begin; end]. async fn prove_messages( @@ -142,6 +155,9 @@ pub trait SourceClient<P: MessageLane>: RelayClient { /// We need given finalized target header on source to continue synchronization. async fn require_target_header_on_source(&self, id: TargetHeaderIdOf<P>); + + /// Estimate cost of single message confirmation transaction in source chain tokens. + async fn estimate_confirmation_transaction(&self) -> P::SourceChainBalance; } /// Target client trait. @@ -183,6 +199,17 @@ pub trait TargetClient<P: MessageLane>: RelayClient { /// We need given finalized source header on target to continue synchronization. async fn require_source_header_on_target(&self, id: SourceHeaderIdOf<P>); + + /// Estimate cost of messages delivery transaction in source chain tokens. + /// + /// Please keep in mind that the returned cost must be converted to the source chain + /// tokens, even though the transaction fee will be paid in the target chain tokens. + async fn estimate_delivery_transaction_in_source_tokens( + &self, + nonces: RangeInclusive<MessageNonce>, + total_dispatch_weight: Weight, + total_size: u32, + ) -> P::SourceChainBalance; } /// State of the client. @@ -426,6 +453,10 @@ pub(crate) mod tests { HeaderId(number, number) } + pub const CONFIRMATION_TRANSACTION_COST: TestSourceChainBalance = 1; + pub const DELIVERY_TRANSACTION_COST: TestSourceChainBalance = 1; + + pub type TestSourceChainBalance = u64; pub type TestSourceHeaderId = HeaderId<TestSourceHeaderNumber, TestSourceHeaderHash>; pub type TestTargetHeaderId = HeaderId<TestTargetHeaderNumber, TestTargetHeaderHash>; @@ -457,6 +488,7 @@ pub(crate) mod tests { type MessagesProof = TestMessagesProof; type MessagesReceivingProof = TestMessagesReceivingProof; + type SourceChainBalance = TestSourceChainBalance; type SourceHeaderNumber = TestSourceHeaderNumber; type SourceHeaderHash = TestSourceHeaderHash; @@ -490,6 +522,15 @@ pub(crate) mod tests { tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>, } + impl Default for TestSourceClient { + fn default() -> Self { + TestSourceClient { + data: Arc::new(Mutex::new(TestClientData::default())), + tick: Arc::new(|_| {}), + } + } + } + #[async_trait] impl RelayClient for TestSourceClient { type Error = TestError; @@ -536,13 +577,22 @@ pub(crate) mod tests { Ok((id, data.source_latest_confirmed_received_nonce)) } - async fn generated_messages_weights( + async fn generated_message_details( &self, _id: SourceHeaderIdOf<TestMessageLane>, nonces: RangeInclusive<MessageNonce>, - ) -> Result<MessageWeightsMap, TestError> { + ) -> Result<MessageDetailsMap<TestSourceChainBalance>, TestError> { Ok(nonces - .map(|nonce| (nonce, MessageWeights { weight: 1, size: 1 })) + .map(|nonce| { + ( + nonce, + MessageDetails { + dispatch_weight: 1, + size: 1, + reward: 1, + }, + ) + }) .collect()) } @@ -596,6 +646,10 @@ pub(crate) mod tests { data.target_to_source_header_requirements.push(id); (self.tick)(&mut *data); } + + async fn estimate_confirmation_transaction(&self) -> TestSourceChainBalance { + CONFIRMATION_TRANSACTION_COST + } } #[derive(Clone)] @@ -604,6 +658,15 @@ pub(crate) mod tests { tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>, } + impl Default for TestTargetClient { + fn default() -> Self { + TestTargetClient { + data: Arc::new(Mutex::new(TestClientData::default())), + tick: Arc::new(|_| {}), + } + } + } + #[async_trait] impl RelayClient for TestTargetClient { type Error = TestError; @@ -702,6 +765,15 @@ pub(crate) mod tests { data.source_to_target_header_requirements.push(id); (self.tick)(&mut *data); } + + async fn estimate_delivery_transaction_in_source_tokens( + &self, + _nonces: RangeInclusive<MessageNonce>, + _total_dispatch_weight: Weight, + total_size: u32, + ) -> TestSourceChainBalance { + DELIVERY_TRANSACTION_COST * (total_size as TestSourceChainBalance) + } } fn run_loop_test( @@ -734,6 +806,7 @@ pub(crate) mod tests { max_messages_in_single_batch: 4, max_messages_weight_in_single_batch: 4, max_messages_size_in_single_batch: 4, + relayer_mode: RelayerMode::Altruistic, }, }, source_client, diff --git a/bridges/relays/messages/src/message_race_delivery.rs b/bridges/relays/messages/src/message_race_delivery.rs index b50b0ffe31ba350a3b46de94bfd54d39572eb2f5..8e88b3763cd8710d3585347493dece8e5aafa87b 100644 --- a/bridges/relays/messages/src/message_race_delivery.rs +++ b/bridges/relays/messages/src/message_race_delivery.rs @@ -15,8 +15,9 @@ use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}; use crate::message_lane_loop::{ - MessageDeliveryParams, MessageProofParameters, MessageWeightsMap, SourceClient as MessageLaneSourceClient, - SourceClientState, TargetClient as MessageLaneTargetClient, TargetClientState, + MessageDeliveryParams, MessageDetailsMap, MessageProofParameters, RelayerMode, + SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient, + TargetClientState, }; use crate::message_race_loop::{ MessageRace, NoncesRange, RaceState, RaceStrategy, SourceClient, SourceClientNonces, TargetClient, @@ -28,13 +29,9 @@ use crate::metrics::MessageLaneLoopMetrics; use async_trait::async_trait; use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight}; use futures::stream::FusedStream; +use num_traits::{SaturatingAdd, Zero}; use relay_utils::FailedClient; -use std::{ - collections::{BTreeMap, VecDeque}, - marker::PhantomData, - ops::RangeInclusive, - time::Duration, -}; +use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration}; /// Run message delivery race. pub async fn run<P: MessageLane>( @@ -48,24 +45,27 @@ pub async fn run<P: MessageLane>( ) -> Result<(), FailedClient> { crate::message_race_loop::run( MessageDeliveryRaceSource { - client: source_client, + client: source_client.clone(), metrics_msg: metrics_msg.clone(), _phantom: Default::default(), }, source_state_updates, MessageDeliveryRaceTarget { - client: target_client, + client: target_client.clone(), metrics_msg, _phantom: Default::default(), }, target_state_updates, stall_timeout, - MessageDeliveryStrategy::<P> { + MessageDeliveryStrategy::<P, _, _> { + lane_source_client: source_client, + lane_target_client: target_client, max_unrewarded_relayer_entries_at_target: params.max_unrewarded_relayer_entries_at_target, max_unconfirmed_nonces_at_target: params.max_unconfirmed_nonces_at_target, max_messages_in_single_batch: params.max_messages_in_single_batch, max_messages_weight_in_single_batch: params.max_messages_weight_in_single_batch, max_messages_size_in_single_batch: params.max_messages_size_in_single_batch, + relayer_mode: params.relayer_mode, latest_confirmed_nonces_at_source: VecDeque::new(), target_nonces: None, strategy: BasicStrategy::new(), @@ -107,7 +107,7 @@ where C: MessageLaneSourceClient<P>, { type Error = C::Error; - type NoncesRange = MessageWeightsMap; + type NoncesRange = MessageDetailsMap<P::SourceChainBalance>; type ProofParameters = MessageProofParameters; async fn nonces( @@ -125,10 +125,10 @@ where let new_nonces = if latest_generated_nonce > prev_latest_nonce { self.client - .generated_messages_weights(at_block.clone(), prev_latest_nonce + 1..=latest_generated_nonce) + .generated_message_details(at_block.clone(), prev_latest_nonce + 1..=latest_generated_nonce) .await? } else { - MessageWeightsMap::new() + MessageDetailsMap::new() }; Ok(( @@ -222,7 +222,11 @@ struct DeliveryRaceTargetNoncesData { } /// Messages delivery strategy. -struct MessageDeliveryStrategy<P: MessageLane> { +struct MessageDeliveryStrategy<P: MessageLane, SC, TC> { + /// The client that is connected to the message lane source node. + lane_source_client: SC, + /// The client that is connected to the message lane target node. + lane_target_client: TC, /// Maximal unrewarded relayer entries at target client. max_unrewarded_relayer_entries_at_target: MessageNonce, /// Maximal unconfirmed nonces at target client. @@ -232,7 +236,9 @@ struct MessageDeliveryStrategy<P: MessageLane> { /// Maximal cumulative messages weight in the single delivery transaction. max_messages_weight_in_single_batch: Weight, /// Maximal messages size in the single delivery transaction. - max_messages_size_in_single_batch: usize, + max_messages_size_in_single_batch: u32, + /// Relayer operating mode. + relayer_mode: RelayerMode, /// Latest confirmed nonces at the source client + the header id where we have first met this nonce. latest_confirmed_nonces_at_source: VecDeque<(SourceHeaderIdOf<P>, MessageNonce)>, /// Target nonces from the source client. @@ -246,11 +252,11 @@ type MessageDeliveryStrategyBase<P> = BasicStrategy< <P as MessageLane>::SourceHeaderHash, <P as MessageLane>::TargetHeaderNumber, <P as MessageLane>::TargetHeaderHash, - MessageWeightsMap, + MessageDetailsMap<<P as MessageLane>::SourceChainBalance>, <P as MessageLane>::MessagesProof, >; -impl<P: MessageLane> std::fmt::Debug for MessageDeliveryStrategy<P> { +impl<P: MessageLane, SC, TC> std::fmt::Debug for MessageDeliveryStrategy<P, SC, TC> { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { fmt.debug_struct("MessageDeliveryStrategy") .field( @@ -280,10 +286,26 @@ impl<P: MessageLane> std::fmt::Debug for MessageDeliveryStrategy<P> { } } -impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof> - for MessageDeliveryStrategy<P> +impl<P: MessageLane, SC, TC> MessageDeliveryStrategy<P, SC, TC> { + /// Returns total weight of all undelivered messages. + fn total_queued_dispatch_weight(&self) -> Weight { + self.strategy + .source_queue() + .iter() + .flat_map(|(_, range)| range.values().map(|details| details.dispatch_weight)) + .fold(0, |total, weight| total.saturating_add(weight)) + } +} + +#[async_trait] +impl<P, SC, TC> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof> + for MessageDeliveryStrategy<P, SC, TC> +where + P: MessageLane, + SC: MessageLaneSourceClient<P>, + TC: MessageLaneTargetClient<P>, { - type SourceNoncesRange = MessageWeightsMap; + type SourceNoncesRange = MessageDetailsMap<P::SourceChainBalance>; type ProofParameters = MessageProofParameters; type TargetNoncesData = DeliveryRaceTargetNoncesData; @@ -383,9 +405,9 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M ) } - fn select_nonces_to_deliver( + async fn select_nonces_to_deliver( &mut self, - race_state: &RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>, + race_state: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>, ) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> { let best_finalized_source_header_id_at_best_target = race_state.best_finalized_source_header_id_at_best_target.clone()?; @@ -473,87 +495,205 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M let max_nonces = std::cmp::min(max_nonces, self.max_messages_in_single_batch); let max_messages_weight_in_single_batch = self.max_messages_weight_in_single_batch; let max_messages_size_in_single_batch = self.max_messages_size_in_single_batch; - let mut selected_weight: Weight = 0; - let mut selected_size: usize = 0; - let mut selected_count: MessageNonce = 0; + let relayer_mode = self.relayer_mode; + let lane_source_client = self.lane_source_client.clone(); + let lane_target_client = self.lane_target_client.clone(); + let previous_total_dispatch_weight = self.total_queued_dispatch_weight(); let selected_nonces = self .strategy - .select_nonces_to_deliver_with_selector(race_state, |range| { - let to_requeue = range - .into_iter() - .skip_while(|(_, weight)| { - // Since we (hopefully) have some reserves in `max_messages_weight_in_single_batch` - // and `max_messages_size_in_single_batch`, we may still try to submit transaction - // with single message if message overflows these limits. The worst case would be if - // transaction will be rejected by the target runtime, but at least we have tried. - - // limit messages in the batch by weight - let new_selected_weight = match selected_weight.checked_add(weight.weight) { - Some(new_selected_weight) if new_selected_weight <= max_messages_weight_in_single_batch => { - new_selected_weight - } - new_selected_weight if selected_count == 0 => { - log::warn!( - target: "bridge", - "Going to submit message delivery transaction with declared dispatch \ - weight {:?} that overflows maximal configured weight {}", - new_selected_weight, - max_messages_weight_in_single_batch, - ); - new_selected_weight.unwrap_or(Weight::MAX) - } - _ => return false, - }; - - // limit messages in the batch by size - let new_selected_size = match selected_size.checked_add(weight.size) { - Some(new_selected_size) if new_selected_size <= max_messages_size_in_single_batch => { - new_selected_size - } - new_selected_size if selected_count == 0 => { - log::warn!( - target: "bridge", - "Going to submit message delivery transaction with message \ - size {:?} that overflows maximal configured size {}", - new_selected_size, - max_messages_size_in_single_batch, - ); - new_selected_size.unwrap_or(usize::MAX) - } - _ => return false, - }; - - // limit number of messages in the batch - let new_selected_count = selected_count + 1; - if new_selected_count > max_nonces { - return false; - } - - selected_weight = new_selected_weight; - selected_size = new_selected_size; - selected_count = new_selected_count; - true - }) - .collect::<BTreeMap<_, _>>(); - if to_requeue.is_empty() { - None - } else { - Some(to_requeue) - } - })?; + .select_nonces_to_deliver_with_selector(race_state.clone(), |range| async { + select_nonces_for_delivery_transaction( + relayer_mode, + max_nonces, + max_messages_weight_in_single_batch, + max_messages_size_in_single_batch, + lane_source_client.clone(), + lane_target_client.clone(), + range, + ) + .await + }) + .await?; + let new_total_dispatch_weight = self.total_queued_dispatch_weight(); + let dispatch_weight = previous_total_dispatch_weight - new_total_dispatch_weight; Some(( selected_nonces, MessageProofParameters { outbound_state_proof_required, - dispatch_weight: selected_weight, + dispatch_weight, }, )) } } -impl NoncesRange for MessageWeightsMap { +/// From given set of source nonces, that are ready to be delivered, select nonces +/// to fit into single delivery transaction. +/// +/// The function returns nonces that are NOT selected for current batch and will be +/// delivered later. +async fn select_nonces_for_delivery_transaction<P: MessageLane>( + relayer_mode: RelayerMode, + max_messages_in_this_batch: MessageNonce, + max_messages_weight_in_single_batch: Weight, + max_messages_size_in_single_batch: u32, + lane_source_client: impl MessageLaneSourceClient<P>, + lane_target_client: impl MessageLaneTargetClient<P>, + ready_nonces: MessageDetailsMap<P::SourceChainBalance>, +) -> Option<MessageDetailsMap<P::SourceChainBalance>> { + let mut hard_selected_count = 0; + let mut soft_selected_count = 0; + + let mut selected_weight: Weight = 0; + let mut selected_size: u32 = 0; + let mut selected_count: MessageNonce = 0; + + let mut total_reward = P::SourceChainBalance::zero(); + let mut total_confirmations_cost = P::SourceChainBalance::zero(); + let mut total_cost = P::SourceChainBalance::zero(); + + // technically, multiple confirmations will be delivered in a single transaction, + // meaning less loses for relayer. But here we don't know the final relayer yet, so + // we're adding a separate transaction for every message. Normally, this cost is covered + // by the message sender. Probably reconsider this? + let confirmation_transaction_cost = if relayer_mode != RelayerMode::Altruistic { + lane_source_client.estimate_confirmation_transaction().await + } else { + Zero::zero() + }; + + for (index, (nonce, details)) in ready_nonces.iter().enumerate() { + // Since we (hopefully) have some reserves in `max_messages_weight_in_single_batch` + // and `max_messages_size_in_single_batch`, we may still try to submit transaction + // with single message if message overflows these limits. The worst case would be if + // transaction will be rejected by the target runtime, but at least we have tried. + + // limit messages in the batch by weight + let new_selected_weight = match selected_weight.checked_add(details.dispatch_weight) { + Some(new_selected_weight) if new_selected_weight <= max_messages_weight_in_single_batch => { + new_selected_weight + } + new_selected_weight if selected_count == 0 => { + log::warn!( + target: "bridge", + "Going to submit message delivery transaction with declared dispatch \ + weight {:?} that overflows maximal configured weight {}", + new_selected_weight, + max_messages_weight_in_single_batch, + ); + new_selected_weight.unwrap_or(Weight::MAX) + } + _ => break, + }; + + // limit messages in the batch by size + let new_selected_size = match selected_size.checked_add(details.size) { + Some(new_selected_size) if new_selected_size <= max_messages_size_in_single_batch => new_selected_size, + new_selected_size if selected_count == 0 => { + log::warn!( + target: "bridge", + "Going to submit message delivery transaction with message \ + size {:?} that overflows maximal configured size {}", + new_selected_size, + max_messages_size_in_single_batch, + ); + new_selected_size.unwrap_or(u32::MAX) + } + _ => break, + }; + + // limit number of messages in the batch + let new_selected_count = selected_count + 1; + if new_selected_count > max_messages_in_this_batch { + break; + } + + // now the message has passed all 'strong' checks, and we CAN deliver it. But do we WANT + // to deliver it? It depends on the relayer strategy. + match relayer_mode { + RelayerMode::Altruistic => { + soft_selected_count = index + 1; + } + RelayerMode::NoLosses => { + let delivery_transaction_cost = lane_target_client + .estimate_delivery_transaction_in_source_tokens( + 0..=(new_selected_count as MessageNonce - 1), + new_selected_weight, + new_selected_size as u32, + ) + .await; + + // if it is the first message that makes reward less than cost, let's log it + // if this message makes batch profitable again, let's log it + let is_total_reward_less_than_cost = total_reward < total_cost; + let prev_total_cost = total_cost; + let prev_total_reward = total_reward; + total_confirmations_cost = total_confirmations_cost.saturating_add(&confirmation_transaction_cost); + total_reward = total_reward.saturating_add(&details.reward); + total_cost = total_confirmations_cost.saturating_add(&delivery_transaction_cost); + if !is_total_reward_less_than_cost && total_reward < total_cost { + log::debug!( + target: "bridge", + "Message with nonce {} (reward = {:?}) changes total cost {:?}->{:?} and makes it larger than \ + total reward {:?}->{:?}", + nonce, + details.reward, + prev_total_cost, + total_cost, + prev_total_reward, + total_reward, + ); + } else if is_total_reward_less_than_cost && total_reward >= total_cost { + log::debug!( + target: "bridge", + "Message with nonce {} (reward = {:?}) changes total cost {:?}->{:?} and makes it less than or \ + equal to the total reward {:?}->{:?} (again)", + nonce, + details.reward, + prev_total_cost, + total_cost, + prev_total_reward, + total_reward, + ); + } + + // NoLosses relayer never want to lose his funds + if total_reward >= total_cost { + soft_selected_count = index + 1; + } + } + } + + hard_selected_count = index + 1; + selected_weight = new_selected_weight; + selected_size = new_selected_size; + selected_count = new_selected_count; + } + + if hard_selected_count != soft_selected_count { + log::warn!( + target: "bridge", + "Relayer may deliver nonces [{:?}; {:?}], but because of its strategy ({:?}) it has selected \ + nonces [{:?}; {:?}].", + ready_nonces.keys().next(), + ready_nonces.keys().next().map(|begin| begin + (hard_selected_count as MessageNonce) - 1), + relayer_mode, + ready_nonces.keys().next(), + ready_nonces.keys().next().map(|begin| begin + (soft_selected_count as MessageNonce) - 1), + + ); + + hard_selected_count = soft_selected_count; + } + if hard_selected_count != ready_nonces.len() { + Some(ready_nonces.into_iter().skip(hard_selected_count).collect()) + } else { + None + } +} + +impl<SourceChainBalance: std::fmt::Debug> NoncesRange for MessageDetailsMap<SourceChainBalance> { fn begin(&self) -> MessageNonce { self.keys().next().cloned().unwrap_or_default() } @@ -576,12 +716,42 @@ impl NoncesRange for MessageWeightsMap { mod tests { use super::*; use crate::message_lane_loop::{ - tests::{header_id, TestMessageLane, TestMessagesProof, TestSourceHeaderId, TestTargetHeaderId}, - MessageWeights, + tests::{ + header_id, TestMessageLane, TestMessagesProof, TestSourceChainBalance, TestSourceClient, + TestSourceHeaderId, TestTargetClient, TestTargetHeaderId, CONFIRMATION_TRANSACTION_COST, + DELIVERY_TRANSACTION_COST, + }, + MessageDetails, }; + const DEFAULT_REWARD: TestSourceChainBalance = CONFIRMATION_TRANSACTION_COST + DELIVERY_TRANSACTION_COST; + type TestRaceState = RaceState<TestSourceHeaderId, TestTargetHeaderId, TestMessagesProof>; - type TestStrategy = MessageDeliveryStrategy<TestMessageLane>; + type TestStrategy = MessageDeliveryStrategy<TestMessageLane, TestSourceClient, TestTargetClient>; + + fn source_nonces( + new_nonces: RangeInclusive<MessageNonce>, + confirmed_nonce: MessageNonce, + reward: TestSourceChainBalance, + ) -> SourceClientNonces<MessageDetailsMap<TestSourceChainBalance>> { + SourceClientNonces { + new_nonces: new_nonces + .into_iter() + .map(|nonce| { + ( + nonce, + MessageDetails { + dispatch_weight: 1, + size: 1, + reward, + }, + ) + }) + .into_iter() + .collect(), + confirmed_nonce: Some(confirmed_nonce), + } + } fn prepare_strategy() -> (TestRaceState, TestStrategy) { let mut race_state = RaceState { @@ -594,12 +764,15 @@ mod tests { }; let mut race_strategy = TestStrategy { + relayer_mode: RelayerMode::Altruistic, max_unrewarded_relayer_entries_at_target: 4, max_unconfirmed_nonces_at_target: 4, max_messages_in_single_batch: 4, max_messages_weight_in_single_batch: 4, max_messages_size_in_single_batch: 4, latest_confirmed_nonces_at_source: vec![(header_id(1), 19)].into_iter().collect(), + lane_source_client: TestSourceClient::default(), + lane_target_client: TestTargetClient::default(), target_nonces: Some(TargetClientNonces { latest_nonce: 19, nonces_data: DeliveryRaceTargetNoncesData { @@ -614,20 +787,9 @@ mod tests { strategy: BasicStrategy::new(), }; - race_strategy.strategy.source_nonces_updated( - header_id(1), - SourceClientNonces { - new_nonces: vec![ - (20, MessageWeights { weight: 1, size: 1 }), - (21, MessageWeights { weight: 1, size: 1 }), - (22, MessageWeights { weight: 1, size: 1 }), - (23, MessageWeights { weight: 1, size: 1 }), - ] - .into_iter() - .collect(), - confirmed_nonce: Some(19), - }, - ); + race_strategy + .strategy + .source_nonces_updated(header_id(1), source_nonces(20..=23, 19, DEFAULT_REWARD)); let target_nonces = TargetClientNonces { latest_nonce: 19, @@ -652,14 +814,15 @@ mod tests { #[test] fn weights_map_works_as_nonces_range() { - fn build_map(range: RangeInclusive<MessageNonce>) -> MessageWeightsMap { + fn build_map(range: RangeInclusive<MessageNonce>) -> MessageDetailsMap<TestSourceChainBalance> { range .map(|idx| { ( idx, - MessageWeights { - weight: idx, + MessageDetails { + dispatch_weight: idx, size: idx as _, + reward: idx as _, }, ) }) @@ -678,19 +841,19 @@ mod tests { assert_eq!(map.greater_than(30), None); } - #[test] - fn message_delivery_strategy_selects_messages_to_deliver() { + #[async_std::test] + async fn message_delivery_strategy_selects_messages_to_deliver() { let (state, mut strategy) = prepare_strategy(); // both sides are ready to relay new messages assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=23), proof_parameters(false, 4))) ); } - #[test] - fn message_delivery_strategy_selects_nothing_if_too_many_confirmations_missing() { + #[async_std::test] + async fn message_delivery_strategy_selects_nothing_if_too_many_confirmations_missing() { let (state, mut strategy) = prepare_strategy(); // if there are already `max_unconfirmed_nonces_at_target` messages on target, @@ -701,11 +864,11 @@ mod tests { )] .into_iter() .collect(); - assert_eq!(strategy.select_nonces_to_deliver(&state), None); + assert_eq!(strategy.select_nonces_to_deliver(state).await, None); } - #[test] - fn message_delivery_strategy_includes_outbound_state_proof_when_new_nonces_are_available() { + #[async_std::test] + async fn message_delivery_strategy_includes_outbound_state_proof_when_new_nonces_are_available() { let (state, mut strategy) = prepare_strategy(); // if there are new confirmed nonces on source, we want to relay this information @@ -713,13 +876,13 @@ mod tests { let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1; strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1; assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=23), proof_parameters(true, 4))) ); } - #[test] - fn message_delivery_strategy_selects_nothing_if_there_are_too_many_unrewarded_relayers() { + #[async_std::test] + async fn message_delivery_strategy_selects_nothing_if_there_are_too_many_unrewarded_relayers() { let (state, mut strategy) = prepare_strategy(); // if there are already `max_unrewarded_relayer_entries_at_target` entries at target, @@ -729,11 +892,12 @@ mod tests { unrewarded_relayers.unrewarded_relayer_entries = strategy.max_unrewarded_relayer_entries_at_target; unrewarded_relayers.messages_in_oldest_entry = 4; } - assert_eq!(strategy.select_nonces_to_deliver(&state), None); + assert_eq!(strategy.select_nonces_to_deliver(state).await, None); } - #[test] - fn message_delivery_strategy_selects_nothing_if_proved_rewards_is_not_enough_to_remove_oldest_unrewarded_entry() { + #[async_std::test] + async fn message_delivery_strategy_selects_nothing_if_proved_rewards_is_not_enough_to_remove_oldest_unrewarded_entry( + ) { let (state, mut strategy) = prepare_strategy(); // if there are already `max_unrewarded_relayer_entries_at_target` entries at target, @@ -746,11 +910,11 @@ mod tests { unrewarded_relayers.unrewarded_relayer_entries = strategy.max_unrewarded_relayer_entries_at_target; unrewarded_relayers.messages_in_oldest_entry = 4; } - assert_eq!(strategy.select_nonces_to_deliver(&state), None); + assert_eq!(strategy.select_nonces_to_deliver(state).await, None); } - #[test] - fn message_delivery_strategy_includes_outbound_state_proof_if_proved_rewards_is_enough() { + #[async_std::test] + async fn message_delivery_strategy_includes_outbound_state_proof_if_proved_rewards_is_enough() { let (state, mut strategy) = prepare_strategy(); // if there are already `max_unrewarded_relayer_entries_at_target` entries at target, @@ -764,73 +928,77 @@ mod tests { unrewarded_relayers.messages_in_oldest_entry = 3; } assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=23), proof_parameters(true, 4))) ); } - #[test] - fn message_delivery_strategy_limits_batch_by_messages_weight() { + #[async_std::test] + async fn message_delivery_strategy_limits_batch_by_messages_weight() { let (state, mut strategy) = prepare_strategy(); // not all queued messages may fit in the batch, because batch has max weight strategy.max_messages_weight_in_single_batch = 3; assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=22), proof_parameters(false, 3))) ); } - #[test] - fn message_delivery_strategy_accepts_single_message_even_if_its_weight_overflows_maximal_weight() { + #[async_std::test] + async fn message_delivery_strategy_accepts_single_message_even_if_its_weight_overflows_maximal_weight() { let (state, mut strategy) = prepare_strategy(); // first message doesn't fit in the batch, because it has weight (10) that overflows max weight (4) - strategy.strategy.source_queue_mut()[0].1.get_mut(&20).unwrap().weight = 10; + strategy.strategy.source_queue_mut()[0] + .1 + .get_mut(&20) + .unwrap() + .dispatch_weight = 10; assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=20), proof_parameters(false, 10))) ); } - #[test] - fn message_delivery_strategy_limits_batch_by_messages_size() { + #[async_std::test] + async fn message_delivery_strategy_limits_batch_by_messages_size() { let (state, mut strategy) = prepare_strategy(); // not all queued messages may fit in the batch, because batch has max weight strategy.max_messages_size_in_single_batch = 3; assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=22), proof_parameters(false, 3))) ); } - #[test] - fn message_delivery_strategy_accepts_single_message_even_if_its_weight_overflows_maximal_size() { + #[async_std::test] + async fn message_delivery_strategy_accepts_single_message_even_if_its_weight_overflows_maximal_size() { let (state, mut strategy) = prepare_strategy(); // first message doesn't fit in the batch, because it has weight (10) that overflows max weight (4) strategy.strategy.source_queue_mut()[0].1.get_mut(&20).unwrap().size = 10; assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=20), proof_parameters(false, 1))) ); } - #[test] - fn message_delivery_strategy_limits_batch_by_messages_count_when_there_is_upper_limit() { + #[async_std::test] + async fn message_delivery_strategy_limits_batch_by_messages_count_when_there_is_upper_limit() { let (state, mut strategy) = prepare_strategy(); // not all queued messages may fit in the batch, because batch has max number of messages limit strategy.max_messages_in_single_batch = 3; assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=22), proof_parameters(false, 3))) ); } - #[test] - fn message_delivery_strategy_limits_batch_by_messages_count_when_there_are_unconfirmed_nonces() { + #[async_std::test] + async fn message_delivery_strategy_limits_batch_by_messages_count_when_there_are_unconfirmed_nonces() { let (state, mut strategy) = prepare_strategy(); // 1 delivery confirmation from target to source is still missing, so we may only @@ -841,13 +1009,13 @@ mod tests { .collect(); strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1; assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=22), proof_parameters(false, 3))) ); } - #[test] - fn message_delivery_strategy_waits_for_confirmed_nonce_header_to_appear_on_target() { + #[async_std::test] + async fn message_delivery_strategy_waits_for_confirmed_nonce_header_to_appear_on_target() { // 1 delivery confirmation from target to source is still missing, so we may deliver // reward confirmation with our message delivery transaction. But the problem is that // the reward has been paid at header 2 && this header is still unknown to target node. @@ -864,7 +1032,7 @@ mod tests { strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1; state.best_finalized_source_header_id_at_best_target = Some(header_id(1)); assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=22), proof_parameters(false, 3))) ); @@ -881,13 +1049,13 @@ mod tests { state.best_finalized_source_header_id_at_source = Some(header_id(2)); state.best_finalized_source_header_id_at_best_target = Some(header_id(2)); assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state).await, Some(((20..=23), proof_parameters(true, 4))) ); } - #[test] - fn source_header_is_requied_when_confirmations_are_required() { + #[async_std::test] + async fn source_header_is_required_when_confirmations_are_required() { // let's prepare situation when: // - all messages [20; 23] have been generated at source block#1; let (mut state, mut strategy) = prepare_strategy(); @@ -895,7 +1063,7 @@ mod tests { // relayers vector capacity; strategy.max_unconfirmed_nonces_at_target = 2; assert_eq!( - strategy.select_nonces_to_deliver(&state), + strategy.select_nonces_to_deliver(state.clone()).await, Some(((20..=21), proof_parameters(false, 2))) ); strategy.finalized_target_nonces_updated( @@ -912,12 +1080,12 @@ mod tests { }, &mut state, ); - assert_eq!(strategy.select_nonces_to_deliver(&state), None); + assert_eq!(strategy.select_nonces_to_deliver(state).await, None); // - messages [1; 10] receiving confirmation has been delivered at source block#2; strategy.source_nonces_updated( header_id(2), SourceClientNonces { - new_nonces: BTreeMap::new(), + new_nonces: MessageDetailsMap::new(), confirmed_nonce: Some(21), }, ); @@ -927,4 +1095,36 @@ mod tests { Some(header_id(2)) ); } + + #[async_std::test] + async fn no_losses_relayer_is_delivering_messages_if_cost_is_equal_to_reward() { + let (state, mut strategy) = prepare_strategy(); + strategy.relayer_mode = RelayerMode::NoLosses; + + // so now we have: + // - 20..=23 with reward = cost + // => strategy shall select all 20..=23 + assert_eq!( + strategy.select_nonces_to_deliver(state).await, + Some(((20..=23), proof_parameters(false, 4))) + ); + } + + #[async_std::test] + async fn no_losses_relayer_is_not_delivering_messages_if_cost_is_larger_than_reward() { + let (mut state, mut strategy) = prepare_strategy(); + let nonces = source_nonces(24..=25, 19, DEFAULT_REWARD - DELIVERY_TRANSACTION_COST); + strategy.strategy.source_nonces_updated(header_id(2), nonces); + state.best_finalized_source_header_id_at_best_target = Some(header_id(2)); + strategy.relayer_mode = RelayerMode::NoLosses; + + // so now we have: + // - 20..=23 with reward = cost + // - 24..=25 with reward less than cost + // => strategy shall only select 20..=23 + assert_eq!( + strategy.select_nonces_to_deliver(state).await, + Some(((20..=23), proof_parameters(false, 4))) + ); + } } diff --git a/bridges/relays/messages/src/message_race_loop.rs b/bridges/relays/messages/src/message_race_loop.rs index 646fad886d7648dd7466ff988eccab783b7d252c..3b427a2d0e27f28102b27197e35310f26731e284 100644 --- a/bridges/relays/messages/src/message_race_loop.rs +++ b/bridges/relays/messages/src/message_race_loop.rs @@ -143,6 +143,7 @@ pub trait TargetClient<P: MessageRace> { } /// Race strategy. +#[async_trait] pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug { /// Type of nonces range expected from the source client. type SourceNoncesRange: NoncesRange; @@ -182,14 +183,14 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug { /// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated /// data) from source to target node. /// Additionally, parameters required to generate proof are returned. - fn select_nonces_to_deliver( + async fn select_nonces_to_deliver( &mut self, - race_state: &RaceState<SourceHeaderId, TargetHeaderId, Proof>, + race_state: RaceState<SourceHeaderId, TargetHeaderId, Proof>, ) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)>; } /// State of the race. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RaceState<SourceHeaderId, TargetHeaderId, Proof> { /// Best finalized source header id at the source client. pub best_finalized_source_header_id_at_source: Option<SourceHeaderId>, @@ -438,7 +439,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( if source_client_is_online { source_client_is_online = false; - let nonces_to_deliver = select_nonces_to_deliver(&race_state, &mut strategy); + let nonces_to_deliver = select_nonces_to_deliver(race_state.clone(), &mut strategy).await; let best_at_source = strategy.best_at_source(); if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver { @@ -554,27 +555,25 @@ where now_time } -fn select_nonces_to_deliver<SourceHeaderId, TargetHeaderId, Proof, Strategy>( - race_state: &RaceState<SourceHeaderId, TargetHeaderId, Proof>, +async fn select_nonces_to_deliver<SourceHeaderId, TargetHeaderId, Proof, Strategy>( + race_state: RaceState<SourceHeaderId, TargetHeaderId, Proof>, strategy: &mut Strategy, ) -> Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Strategy::ProofParameters)> where SourceHeaderId: Clone, Strategy: RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>, { - race_state - .best_finalized_source_header_id_at_best_target - .as_ref() - .and_then(|best_finalized_source_header_id_at_best_target| { - strategy - .select_nonces_to_deliver(race_state) - .map(|(nonces_range, proof_parameters)| { - ( - best_finalized_source_header_id_at_best_target.clone(), - nonces_range, - proof_parameters, - ) - }) + let best_finalized_source_header_id_at_best_target = + race_state.best_finalized_source_header_id_at_best_target.clone()?; + strategy + .select_nonces_to_deliver(race_state) + .await + .map(|(nonces_range, proof_parameters)| { + ( + best_finalized_source_header_id_at_best_target, + nonces_range, + proof_parameters, + ) }) } @@ -584,8 +583,8 @@ mod tests { use crate::message_race_strategy::BasicStrategy; use relay_utils::HeaderId; - #[test] - fn proof_is_generated_at_best_block_known_to_target_node() { + #[async_std::test] + async fn proof_is_generated_at_best_block_known_to_target_node() { const GENERATED_AT: u64 = 6; const BEST_AT_SOURCE: u64 = 10; const BEST_AT_TARGET: u64 = 8; @@ -620,7 +619,7 @@ mod tests { // the proof will be generated on source, but using BEST_AT_TARGET block assert_eq!( - select_nonces_to_deliver(&race_state, &mut strategy), + select_nonces_to_deliver(race_state, &mut strategy).await, Some((HeaderId(BEST_AT_TARGET, BEST_AT_TARGET), 6..=10, (),)) ); } diff --git a/bridges/relays/messages/src/message_race_strategy.rs b/bridges/relays/messages/src/message_race_strategy.rs index 394574ecb44fc3ca8294d936711c526fa5ae066e..c17845d97f722876a6add7736281c73e8b4eee8c 100644 --- a/bridges/relays/messages/src/message_race_strategy.rs +++ b/bridges/relays/messages/src/message_race_strategy.rs @@ -19,9 +19,10 @@ use crate::message_race_loop::{NoncesRange, RaceState, RaceStrategy, SourceClientNonces, TargetClientNonces}; +use async_trait::async_trait; use bp_messages::MessageNonce; use relay_utils::HeaderId; -use std::{collections::VecDeque, fmt::Debug, marker::PhantomData, ops::RangeInclusive}; +use std::{collections::VecDeque, fmt::Debug, future::Future, marker::PhantomData, ops::RangeInclusive}; /// Nonces delivery strategy. #[derive(Debug)] @@ -57,6 +58,13 @@ where } } + /// Reference to source queue. + pub(crate) fn source_queue( + &self, + ) -> &VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)> { + &self.source_queue + } + /// Mutable reference to source queue to use in tests. #[cfg(test)] pub(crate) fn source_queue_mut( @@ -73,14 +81,14 @@ where /// right now, it should return `Some` with 'undeliverable' nonces. Please keep in mind that /// this should be the sub-range that the passed range ends with, because nonces are always /// delivered in-order. Otherwise the function will panic. - pub fn select_nonces_to_deliver_with_selector( + pub async fn select_nonces_to_deliver_with_selector<F: Future<Output = Option<SourceNoncesRange>>>( &mut self, - race_state: &RaceState< + race_state: RaceState< HeaderId<SourceHeaderHash, SourceHeaderNumber>, HeaderId<TargetHeaderHash, TargetHeaderNumber>, Proof, >, - mut selector: impl FnMut(SourceNoncesRange) -> Option<SourceNoncesRange>, + selector: impl Fn(SourceNoncesRange) -> F, ) -> Option<RangeInclusive<MessageNonce>> { // if we do not know best nonce at target node, we can't select anything let target_nonce = self.best_target_nonce?; @@ -99,7 +107,7 @@ where // 2) we can't deliver new nonce until header, that has emitted this nonce, is finalized // by target client // 3) selector is used for more complicated logic - let best_header_at_target = &race_state.best_finalized_source_header_id_at_best_target.as_ref()?; + let best_header_at_target = race_state.best_finalized_source_header_id_at_best_target.clone()?; let mut nonces_end = None; while let Some((queued_at, queued_range)) = self.source_queue.pop_front() { // select (sub) range to deliver @@ -111,7 +119,7 @@ where Some(queued_range) } else { // selector returns `Some(range)` if this `range` needs to be requeued - selector(queued_range) + selector(queued_range).await }; // requeue (sub) range and update range to deliver @@ -143,16 +151,17 @@ where } } +#[async_trait] impl<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, SourceNoncesRange, Proof> RaceStrategy<HeaderId<SourceHeaderHash, SourceHeaderNumber>, HeaderId<TargetHeaderHash, TargetHeaderNumber>, Proof> for BasicStrategy<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, SourceNoncesRange, Proof> where - SourceHeaderHash: Clone + Debug, - SourceHeaderNumber: Clone + Ord + Debug, - SourceNoncesRange: NoncesRange + Debug, - TargetHeaderHash: Debug, - TargetHeaderNumber: Debug, - Proof: Debug, + SourceHeaderHash: Clone + Debug + Send, + SourceHeaderNumber: Clone + Ord + Debug + Send, + SourceNoncesRange: NoncesRange + Debug + Send, + TargetHeaderHash: Debug + Send, + TargetHeaderNumber: Debug + Send, + Proof: Debug + Send, { type SourceNoncesRange = SourceNoncesRange; type ProofParameters = (); @@ -271,15 +280,16 @@ where )); } - fn select_nonces_to_deliver( + async fn select_nonces_to_deliver( &mut self, - race_state: &RaceState< + race_state: RaceState< HeaderId<SourceHeaderHash, SourceHeaderNumber>, HeaderId<TargetHeaderHash, TargetHeaderNumber>, Proof, >, ) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> { - self.select_nonces_to_deliver_with_selector(race_state, |_| None) + self.select_nonces_to_deliver_with_selector(race_state, |_| async { None }) + .await .map(|range| (range, ())) } } @@ -396,28 +406,28 @@ mod tests { assert!(state.nonces_submitted.is_none()); } - #[test] - fn nothing_is_selected_if_something_is_already_selected() { + #[async_std::test] + async fn nothing_is_selected_if_something_is_already_selected() { let mut state = RaceState::default(); let mut strategy = BasicStrategy::<TestMessageLane>::new(); state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None))); strategy.best_target_nonces_updated(target_nonces(0), &mut state); strategy.source_nonces_updated(header_id(1), source_nonces(1..=10)); - assert_eq!(strategy.select_nonces_to_deliver(&state), None); + assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); } - #[test] - fn nothing_is_selected_if_something_is_already_submitted() { + #[async_std::test] + async fn nothing_is_selected_if_something_is_already_submitted() { let mut state = RaceState::default(); let mut strategy = BasicStrategy::<TestMessageLane>::new(); state.nonces_submitted = Some(1..=10); strategy.best_target_nonces_updated(target_nonces(0), &mut state); strategy.source_nonces_updated(header_id(1), source_nonces(1..=10)); - assert_eq!(strategy.select_nonces_to_deliver(&state), None); + assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); } - #[test] - fn select_nonces_to_deliver_works() { + #[async_std::test] + async fn select_nonces_to_deliver_works() { let mut state = RaceState::<_, _, TestMessagesProof>::default(); let mut strategy = BasicStrategy::<TestMessageLane>::new(); strategy.best_target_nonces_updated(target_nonces(0), &mut state); @@ -427,14 +437,20 @@ mod tests { strategy.source_nonces_updated(header_id(5), source_nonces(7..=8)); state.best_finalized_source_header_id_at_best_target = Some(header_id(4)); - assert_eq!(strategy.select_nonces_to_deliver(&state), Some((1..=6, ()))); + assert_eq!( + strategy.select_nonces_to_deliver(state.clone()).await, + Some((1..=6, ())) + ); strategy.best_target_nonces_updated(target_nonces(6), &mut state); - assert_eq!(strategy.select_nonces_to_deliver(&state), None); + assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); state.best_finalized_source_header_id_at_best_target = Some(header_id(5)); - assert_eq!(strategy.select_nonces_to_deliver(&state), Some((7..=8, ()))); + assert_eq!( + strategy.select_nonces_to_deliver(state.clone()).await, + Some((7..=8, ())) + ); strategy.best_target_nonces_updated(target_nonces(8), &mut state); - assert_eq!(strategy.select_nonces_to_deliver(&state), None); + assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); } #[test] @@ -449,7 +465,9 @@ mod tests { state.best_target_header_id = Some(header_id(1)); assert_eq!( - strategy.select_nonces_to_deliver_with_selector(&state, |_| Some(50..=100)), + async_std::task::block_on( + strategy.select_nonces_to_deliver_with_selector(state, |_| async { Some(50..=100) }) + ), Some(1..=49), ); } @@ -464,7 +482,11 @@ mod tests { state.best_finalized_source_header_id_at_source = Some(header_id(1)); state.best_finalized_source_header_id_at_best_target = Some(header_id(1)); state.best_target_header_id = Some(header_id(1)); - strategy.select_nonces_to_deliver_with_selector(&state, invalid_selector); + async_std::task::block_on(async move { + strategy + .select_nonces_to_deliver_with_selector(state, |range| async { invalid_selector(range) }) + .await; + }); } #[test]