diff --git a/bridges/relays/clients/substrate/src/finality_source.rs b/bridges/relays/clients/substrate/src/finality_source.rs index 18293efa128f9b62eda59b84d9b3f05d277f989c..9b6c0975a45cf8ab89de6c7edb2970c8b46f612f 100644 --- a/bridges/relays/clients/substrate/src/finality_source.rs +++ b/bridges/relays/clients/substrate/src/finality_source.rs @@ -98,7 +98,7 @@ where >, P::Header: SourceHeader<C::BlockNumber>, { - type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::BlockNumber>>>>; + type FinalityProofsStream = Pin<Box<dyn Stream<Item = Justification<C::BlockNumber>> + Send>>; async fn best_finalized_block_number(&self) -> Result<P::Number, Error> { // we **CAN** continue to relay finality proofs if source node is out of sync, because diff --git a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs index e703711843a9c5b368f587d9fd0faf76bcd3e11d..b70decd6f49f9d15204c5137eac9ed660a04c2b6 100644 --- a/bridges/relays/ethereum/src/ethereum_deploy_contract.rs +++ b/bridges/relays/ethereum/src/ethereum_deploy_contract.rs @@ -48,9 +48,7 @@ pub struct EthereumDeployContractParams { } /// Deploy Bridge contract on Ethereum chain. -pub fn run(params: EthereumDeployContractParams) { - let mut local_pool = futures::executor::LocalPool::new(); - +pub async fn run(params: EthereumDeployContractParams) { let EthereumDeployContractParams { eth_params, eth_sign, @@ -61,7 +59,7 @@ pub fn run(params: EthereumDeployContractParams) { eth_contract_code, } = params; - let result = local_pool.run_until(async move { + let result = async move { let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?; let sub_client = SubstrateClient::<Rialto>::new(sub_params).await.map_err(RpcError::Substrate)?; @@ -91,7 +89,7 @@ pub fn run(params: EthereumDeployContractParams) { initial_set_id, initial_set, ).await - }); + }.await; if let Err(error) = result { log::error!(target: "bridge", "{}", error); diff --git a/bridges/relays/ethereum/src/ethereum_exchange.rs b/bridges/relays/ethereum/src/ethereum_exchange.rs index ecd22ab81a1f9407a1474ac8659634a9fc3a35b7..5fed62b9ca686218f9f2e66bbee074059af961eb 100644 --- a/bridges/relays/ethereum/src/ethereum_exchange.rs +++ b/bridges/relays/ethereum/src/ethereum_exchange.rs @@ -282,19 +282,39 @@ impl TargetClient<EthereumToSubstrateExchange> for SubstrateTransactionsTarget { } /// Relay exchange transaction proof(s) to Substrate node. -pub fn run(params: EthereumExchangeParams) { +pub async fn run(params: EthereumExchangeParams) { match params.mode { - ExchangeRelayMode::Single(eth_tx_hash) => run_single_transaction_relay(params, eth_tx_hash), + ExchangeRelayMode::Single(eth_tx_hash) => { + let result = run_single_transaction_relay(params, eth_tx_hash).await; + match result { + Ok(_) => log::info!( + target: "bridge", + "Ethereum transaction {} proof has been successfully submitted to Substrate node", + eth_tx_hash, + ), + Err(err) => log::error!( + target: "bridge", + "Error submitting Ethereum transaction {} proof to Substrate node: {}", + eth_tx_hash, + err, + ), + } + } ExchangeRelayMode::Auto(eth_start_with_block_number) => { - run_auto_transactions_relay_loop(params, eth_start_with_block_number) + let result = run_auto_transactions_relay_loop(params, eth_start_with_block_number).await; + if let Err(err) = result { + log::error!( + target: "bridge", + "Error auto-relaying Ethereum transactions proofs to Substrate node: {}", + err, + ); + } } - }; + } } /// Run single transaction proof relay and stop. -fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H256) { - let mut local_pool = futures::executor::LocalPool::new(); - +async fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H256) -> Result<(), String> { let EthereumExchangeParams { eth_params, sub_params, @@ -303,43 +323,25 @@ fn run_single_transaction_relay(params: EthereumExchangeParams, eth_tx_hash: H25 .. } = params; - let result = local_pool.run_until(async move { - let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?; - let sub_client = SubstrateClient::<Rialto>::new(sub_params) - .await - .map_err(RpcError::Substrate)?; - - let source = EthereumTransactionsSource { client: eth_client }; - let target = SubstrateTransactionsTarget { - client: sub_client, - sign_params: sub_sign, - bridge_instance: instance, - }; + let eth_client = EthereumClient::new(eth_params).await.map_err(RpcError::Ethereum)?; + let sub_client = SubstrateClient::<Rialto>::new(sub_params) + .await + .map_err(RpcError::Substrate)?; - relay_single_transaction_proof(&source, &target, eth_tx_hash).await - }); + let source = EthereumTransactionsSource { client: eth_client }; + let target = SubstrateTransactionsTarget { + client: sub_client, + sign_params: sub_sign, + bridge_instance: instance, + }; - match result { - Ok(_) => { - log::info!( - target: "bridge", - "Ethereum transaction {} proof has been successfully submitted to Substrate node", - eth_tx_hash, - ); - } - Err(err) => { - log::error!( - target: "bridge", - "Error submitting Ethereum transaction {} proof to Substrate node: {}", - eth_tx_hash, - err, - ); - } - } + relay_single_transaction_proof(&source, &target, eth_tx_hash).await } -/// Run auto-relay loop. -fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_with_block_number: Option<u64>) { +async fn run_auto_transactions_relay_loop( + params: EthereumExchangeParams, + eth_start_with_block_number: Option<u64>, +) -> Result<(), String> { let EthereumExchangeParams { eth_params, sub_params, @@ -349,46 +351,41 @@ fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_wi .. } = params; - let do_run_loop = move || -> Result<(), String> { - let eth_client = async_std::task::block_on(EthereumClient::new(eth_params)) - .map_err(|err| format!("Error starting Ethereum client: {:?}", err))?; - let sub_client = async_std::task::block_on(SubstrateClient::<Rialto>::new(sub_params)) - .map_err(|err| format!("Error starting Substrate client: {:?}", err))?; - - let eth_start_with_block_number = match eth_start_with_block_number { - Some(eth_start_with_block_number) => eth_start_with_block_number, - None => { - async_std::task::block_on(sub_client.best_ethereum_finalized_block()) - .map_err(|err| { - format!( - "Error retrieving best finalized Ethereum block from Substrate node: {:?}", - err - ) - })? - .0 - } - }; - - run_loop( - InMemoryStorage::new(eth_start_with_block_number), - EthereumTransactionsSource { client: eth_client }, - SubstrateTransactionsTarget { - client: sub_client, - sign_params: sub_sign, - bridge_instance: instance, - }, - metrics_params, - futures::future::pending(), - ); - - Ok(()) + let eth_client = EthereumClient::new(eth_params) + .await + .map_err(|err| format!("Error starting Ethereum client: {:?}", err))?; + let sub_client = SubstrateClient::<Rialto>::new(sub_params) + .await + .map_err(|err| format!("Error starting Substrate client: {:?}", err))?; + + let eth_start_with_block_number = match eth_start_with_block_number { + Some(eth_start_with_block_number) => eth_start_with_block_number, + None => { + sub_client + .best_ethereum_finalized_block() + .await + .map_err(|err| { + format!( + "Error retrieving best finalized Ethereum block from Substrate node: {:?}", + err + ) + })? + .0 + } }; - if let Err(err) = do_run_loop() { - log::error!( - target: "bridge", - "Error auto-relaying Ethereum transactions proofs to Substrate node: {}", - err, - ); - } + run_loop( + InMemoryStorage::new(eth_start_with_block_number), + EthereumTransactionsSource { client: eth_client }, + SubstrateTransactionsTarget { + client: sub_client, + sign_params: sub_sign, + bridge_instance: instance, + }, + metrics_params, + futures::future::pending(), + ) + .await; + + Ok(()) } diff --git a/bridges/relays/ethereum/src/ethereum_exchange_submit.rs b/bridges/relays/ethereum/src/ethereum_exchange_submit.rs index 8f9f942dac53ec2b4f227379a4a3534afa8ea673..8616cce2166cb514954008f3aa0a61499e854f88 100644 --- a/bridges/relays/ethereum/src/ethereum_exchange_submit.rs +++ b/bridges/relays/ethereum/src/ethereum_exchange_submit.rs @@ -42,9 +42,7 @@ pub struct EthereumExchangeSubmitParams { } /// Submit single Ethereum -> Substrate exchange transaction. -pub fn run(params: EthereumExchangeSubmitParams) { - let mut local_pool = futures::executor::LocalPool::new(); - +pub async fn run(params: EthereumExchangeSubmitParams) { let EthereumExchangeSubmitParams { eth_params, eth_sign, @@ -53,7 +51,7 @@ pub fn run(params: EthereumExchangeSubmitParams) { sub_recipient, } = params; - let result: Result<_, String> = local_pool.run_until(async move { + let result: Result<_, String> = async move { let eth_client = EthereumClient::new(eth_params) .await .map_err(|err| format!("error connecting to Ethereum node: {:?}", err))?; @@ -94,7 +92,8 @@ pub fn run(params: EthereumExchangeSubmitParams) { .map_err(|err| format!("error submitting transaction: {:?}", err))?; Ok(eth_tx_unsigned) - }); + } + .await; match result { Ok(eth_tx_unsigned) => { diff --git a/bridges/relays/ethereum/src/ethereum_sync_loop.rs b/bridges/relays/ethereum/src/ethereum_sync_loop.rs index 2c1abb358c63f3344307704254c79a40f39eb58e..b4fd788f9f67482e23640cd75c28839b66a34201 100644 --- a/bridges/relays/ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/ethereum/src/ethereum_sync_loop.rs @@ -248,7 +248,7 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget { } /// Run Ethereum headers synchronization. -pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> { +pub async fn run(params: EthereumSyncParams) -> Result<(), RpcError> { let EthereumSyncParams { eth_params, sub_params, @@ -278,7 +278,8 @@ pub fn run(params: EthereumSyncParams) -> Result<(), RpcError> { sync_params, metrics_params, futures::future::pending(), - ); + ) + .await; Ok(()) } diff --git a/bridges/relays/ethereum/src/main.rs b/bridges/relays/ethereum/src/main.rs index c0c36cc1306669c042404cf520a35b405cf8a80e..b2080c396f2d7f59fcd7f191c8137e5c0da06116 100644 --- a/bridges/relays/ethereum/src/main.rs +++ b/bridges/relays/ethereum/src/main.rs @@ -50,6 +50,10 @@ fn main() { let yaml = clap::load_yaml!("cli.yml"); let matches = clap::App::from_yaml(yaml).get_matches(); + async_std::task::block_on(run_command(&matches)); +} + +async fn run_command(matches: &clap::ArgMatches<'_>) { match matches.subcommand() { ("eth-to-sub", Some(eth_to_sub_matches)) => { log::info!(target: "bridge", "Starting ETH âž¡ SUB relay."); @@ -60,6 +64,7 @@ fn main() { return; } }) + .await .is_err() { log::error!(target: "bridge", "Unable to get Substrate genesis block for Ethereum sync."); @@ -74,6 +79,7 @@ fn main() { return; } }) + .await .is_err() { log::error!(target: "bridge", "Unable to get Substrate genesis block for Substrate sync."); @@ -87,7 +93,8 @@ fn main() { log::error!(target: "bridge", "Error during contract deployment: {}", err); return; } - }); + }) + .await; } ("eth-submit-exchange-tx", Some(eth_exchange_submit_matches)) => { log::info!(target: "bridge", "Submitting ETH âž¡ SUB exchange transaction."); @@ -97,7 +104,8 @@ fn main() { log::error!(target: "bridge", "Error submitting Eethereum exchange transaction: {}", err); return; } - }); + }) + .await; } ("eth-exchange-sub", Some(eth_exchange_matches)) => { log::info!(target: "bridge", "Starting ETH âž¡ SUB exchange transactions relay."); @@ -107,7 +115,8 @@ fn main() { log::error!(target: "bridge", "Error relaying Ethereum transactions proofs: {}", err); return; } - }); + }) + .await; } ("", _) => { log::error!(target: "bridge", "No subcommand specified"); @@ -398,16 +407,3 @@ fn parse_hex_argument(matches: &clap::ArgMatches, arg: &str) -> Result<Option<Ve None => Ok(None), } } - -#[cfg(test)] -mod tests { - - // Details: https://github.com/paritytech/parity-bridges-common/issues/118 - #[test] - fn async_std_sleep_works() { - let mut local_pool = futures::executor::LocalPool::new(); - local_pool.run_until(async move { - async_std::task::sleep(std::time::Duration::from_secs(1)).await; - }); - } -} diff --git a/bridges/relays/ethereum/src/substrate_sync_loop.rs b/bridges/relays/ethereum/src/substrate_sync_loop.rs index 5f30e247c5a79a6c2214f400c32d2cabcee3144f..a0ff44d4d9bea8e20bac311a9fba9abbb8724499 100644 --- a/bridges/relays/ethereum/src/substrate_sync_loop.rs +++ b/bridges/relays/ethereum/src/substrate_sync_loop.rs @@ -163,7 +163,7 @@ impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget { } /// Run Substrate headers synchronization. -pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { +pub async fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { let SubstrateSyncParams { sub_params, eth_params, @@ -188,7 +188,8 @@ pub fn run(params: SubstrateSyncParams) -> Result<(), RpcError> { sync_params, metrics_params, futures::future::pending(), - ); + ) + .await; Ok(()) } diff --git a/bridges/relays/generic/exchange/src/exchange_loop.rs b/bridges/relays/generic/exchange/src/exchange_loop.rs index 06f4d3f40ab015c58a3e009bfe8f3cfc875c8f8e..f09ad7de41bb5ae6274134ea4abaad0c696d8fa5 100644 --- a/bridges/relays/generic/exchange/src/exchange_loop.rs +++ b/bridges/relays/generic/exchange/src/exchange_loop.rs @@ -79,7 +79,7 @@ impl<BlockNumber: Clone + Copy> TransactionProofsRelayStorage for InMemoryStorag } /// Run proofs synchronization. -pub fn run<P: TransactionProofPipeline>( +pub async fn run<P: TransactionProofPipeline>( storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>, source_client: impl SourceClient<P>, target_client: impl TargetClient<P>, @@ -119,7 +119,8 @@ pub fn run<P: TransactionProofPipeline>( exit_signal.clone(), ) }, - ); + ) + .await; } /// Run proofs synchronization. diff --git a/bridges/relays/generic/finality/src/finality_loop.rs b/bridges/relays/generic/finality/src/finality_loop.rs index af5da42cee70c01bd6812b70d760bb9527da9ac3..7aafce075e60884a7aa04f9a751526d421a47949 100644 --- a/bridges/relays/generic/finality/src/finality_loop.rs +++ b/bridges/relays/generic/finality/src/finality_loop.rs @@ -91,7 +91,7 @@ pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient { } /// Run finality proofs synchronization loop. -pub fn run<P: FinalitySyncPipeline>( +pub async fn run<P: FinalitySyncPipeline>( source_client: impl SourceClient<P>, target_client: impl TargetClient<P>, sync_params: FinalitySyncParams, @@ -132,7 +132,8 @@ pub fn run<P: FinalitySyncPipeline>( exit_signal.clone(), ) }, - ); + ) + .await; } /// Unjustified headers container. Ordered by header number. diff --git a/bridges/relays/generic/headers/src/sync_loop.rs b/bridges/relays/generic/headers/src/sync_loop.rs index d2584f2ccb2c816c1f83e78b2a7757e0f9fcd0be..7da8fd4f42fec996cd15c638c5c2853af3825f3d 100644 --- a/bridges/relays/generic/headers/src/sync_loop.rs +++ b/bridges/relays/generic/headers/src/sync_loop.rs @@ -112,7 +112,7 @@ impl<P: HeadersSyncPipeline> SyncMaintain<P> for () {} /// Run headers synchronization. #[allow(clippy::too_many_arguments)] -pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>( +pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>( source_client: impl SourceClient<P>, source_tick: Duration, target_client: TC, @@ -159,7 +159,8 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>( exit_signal.clone(), ) }, - ); + ) + .await; } /// Run headers synchronization. diff --git a/bridges/relays/generic/messages/src/message_lane_loop.rs b/bridges/relays/generic/messages/src/message_lane_loop.rs index 28b55dba47cc7dc84767680f598b1b71e9bfc002..afbaf7a015a45ce7d76488ab7aaec5f518f1cb96 100644 --- a/bridges/relays/generic/messages/src/message_lane_loop.rs +++ b/bridges/relays/generic/messages/src/message_lane_loop.rs @@ -206,7 +206,7 @@ pub struct ClientsState<P: MessageLane> { } /// Run message lane service loop. -pub fn run<P: MessageLane>( +pub async fn run<P: MessageLane>( params: Params, source_client: impl SourceClient<P>, target_client: impl TargetClient<P>, @@ -251,7 +251,8 @@ pub fn run<P: MessageLane>( exit_signal.clone(), ) }, - ); + ) + .await; } /// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received. diff --git a/bridges/relays/generic/utils/src/relay_loop.rs b/bridges/relays/generic/utils/src/relay_loop.rs index d750358edaa02c3bab0254285b0d4330a38e6f01..6a61ecd2893432fb26258d894ebca4e52bd28c47 100644 --- a/bridges/relays/generic/utils/src/relay_loop.rs +++ b/bridges/relays/generic/utils/src/relay_loop.rs @@ -37,7 +37,7 @@ pub trait Client: Clone + Send + Sync { /// This function represents an outer loop, which in turn calls provided `loop_run` function to do /// actual job. When `loop_run` returns, this outer loop reconnects to failed client (source, /// target or both) and calls `loop_run` again. -pub fn run<SC: Client, TC: Client, R, F>( +pub async fn run<SC: Client, TC: Client, R, F>( reconnect_delay: Duration, mut source_client: SC, mut target_client: TC, @@ -46,50 +46,46 @@ pub fn run<SC: Client, TC: Client, R, F>( R: Fn(SC, TC) -> F, F: Future<Output = Result<(), FailedClient>>, { - let mut local_pool = futures::executor::LocalPool::new(); + loop { + let result = loop_run(source_client.clone(), target_client.clone()).await; - local_pool.run_until(async move { - loop { - let result = loop_run(source_client.clone(), target_client.clone()).await; - - match result { - Ok(()) => break, - Err(failed_client) => loop { - async_std::task::sleep(reconnect_delay).await; - if failed_client == FailedClient::Both || failed_client == FailedClient::Source { - match source_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to source client. Going to retry in {}s: {:?}", - reconnect_delay.as_secs(), - error, - ); - continue; - } + match result { + Ok(()) => break, + Err(failed_client) => loop { + async_std::task::sleep(reconnect_delay).await; + if failed_client == FailedClient::Both || failed_client == FailedClient::Source { + match source_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to source client. Going to retry in {}s: {:?}", + reconnect_delay.as_secs(), + error, + ); + continue; } } - if failed_client == FailedClient::Both || failed_client == FailedClient::Target { - match target_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to target client. Going to retry in {}s: {:?}", - reconnect_delay.as_secs(), - error, - ); - continue; - } + } + if failed_client == FailedClient::Both || failed_client == FailedClient::Target { + match target_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to target client. Going to retry in {}s: {:?}", + reconnect_delay.as_secs(), + error, + ); + continue; } } + } - break; - }, - } - - log::debug!(target: "bridge", "Restarting relay loop"); + break; + }, } - }); + + log::debug!(target: "bridge", "Restarting relay loop"); + } } diff --git a/bridges/relays/substrate/src/finality_pipeline.rs b/bridges/relays/substrate/src/finality_pipeline.rs index 21865b6c4485b58edc217a02e7486e84ce342b30..574db6a3f533a75aeb36e61a66a33c5ded8e880f 100644 --- a/bridges/relays/substrate/src/finality_pipeline.rs +++ b/bridges/relays/substrate/src/finality_pipeline.rs @@ -126,5 +126,6 @@ pub async fn run<SourceChain, TargetChain, P>( }, metrics_params, futures::future::pending(), - ); + ) + .await; } diff --git a/bridges/relays/substrate/src/rialto_millau/millau_messages_to_rialto.rs b/bridges/relays/substrate/src/rialto_millau/millau_messages_to_rialto.rs index 84664e4572bba02b9b30b7b499c7a4fce0d7a18b..08a0fe9c08806c336515977691c92dee069fb089 100644 --- a/bridges/relays/substrate/src/rialto_millau/millau_messages_to_rialto.rs +++ b/bridges/relays/substrate/src/rialto_millau/millau_messages_to_rialto.rs @@ -125,7 +125,7 @@ type MillauSourceClient = SubstrateMessagesSource<Millau, MillauMessagesToRialto type RialtoTargetClient = SubstrateMessagesTarget<Rialto, MillauMessagesToRialto>; /// Run Millau-to-Rialto messages sync. -pub fn run( +pub async fn run( millau_client: MillauClient, millau_sign: MillauSigningParams, rialto_client: RialtoClient, @@ -185,5 +185,6 @@ pub fn run( RialtoTargetClient::new(rialto_client, lane, lane_id, MILLAU_BRIDGE_INSTANCE), metrics_params, futures::future::pending(), - ); + ) + .await; } diff --git a/bridges/relays/substrate/src/rialto_millau/mod.rs b/bridges/relays/substrate/src/rialto_millau/mod.rs index 72919ccb2422d94cdc47bcd16a52191afc2e20ac..45ef7e322af8b2cfaf4f9cd21a118e890d325440 100644 --- a/bridges/relays/substrate/src/rialto_millau/mod.rs +++ b/bridges/relays/substrate/src/rialto_millau/mod.rs @@ -198,7 +198,8 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> { rialto_sign, lane.into(), prometheus_params.into(), - ); + ) + .await; } cli::RelayMessages::RialtoToMillau { rialto, @@ -220,7 +221,8 @@ async fn run_relay_messages(command: cli::RelayMessages) -> Result<(), String> { millau_sign, lane.into(), prometheus_params.into(), - ); + ) + .await; } } Ok(()) diff --git a/bridges/relays/substrate/src/rialto_millau/rialto_messages_to_millau.rs b/bridges/relays/substrate/src/rialto_millau/rialto_messages_to_millau.rs index 3083ce3cbf5d479718a53935417938b8d36b3a89..b0ce256aa7669b11828b4098d6218fed0f139b54 100644 --- a/bridges/relays/substrate/src/rialto_millau/rialto_messages_to_millau.rs +++ b/bridges/relays/substrate/src/rialto_millau/rialto_messages_to_millau.rs @@ -125,7 +125,7 @@ type RialtoSourceClient = SubstrateMessagesSource<Rialto, RialtoMessagesToMillau type MillauTargetClient = SubstrateMessagesTarget<Millau, RialtoMessagesToMillau>; /// Run Rialto-to-Millau messages sync. -pub fn run( +pub async fn run( rialto_client: RialtoClient, rialto_sign: RialtoSigningParams, millau_client: MillauClient, @@ -184,5 +184,6 @@ pub fn run( MillauTargetClient::new(millau_client, lane, lane_id, RIALTO_BRIDGE_INSTANCE), metrics_params, futures::future::pending(), - ); + ) + .await; }