diff --git a/bridges/relays/client-substrate/src/client.rs b/bridges/relays/client-substrate/src/client.rs index afbda8599b2aa03c4ed4a7c33a93fdd429646516..2e7cb7455f76cceee1c63aae4efb4a5cfe9f2a69 100644 --- a/bridges/relays/client-substrate/src/client.rs +++ b/bridges/relays/client-substrate/src/client.rs @@ -77,7 +77,12 @@ pub fn is_ancient_block<N: From<u32> + PartialOrd + Saturating>(block: N, best: } /// Opaque justifications subscription type. -pub struct Subscription<T>(pub(crate) Mutex<futures::channel::mpsc::Receiver<Option<T>>>); +pub struct Subscription<T>( + pub(crate) Mutex<futures::channel::mpsc::Receiver<Option<T>>>, + // The following field is not explicitly used by the code. But when it is dropped, + // the bakground task receives a shutdown signal. + #[allow(dead_code)] pub(crate) futures::channel::oneshot::Sender<()>, +); /// Opaque GRANDPA authorities set. pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>; @@ -621,6 +626,7 @@ impl<C: Chain> Client<C> { e })??; + let (cancel_sender, cancel_receiver) = futures::channel::oneshot::channel(); let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); let (tracker, subscription) = self .jsonrpsee_execute(move |client| async move { @@ -639,7 +645,7 @@ impl<C: Chain> Client<C> { self_clone, stall_timeout, tx_hash, - Subscription(Mutex::new(receiver)), + Subscription(Mutex::new(receiver), cancel_sender), ); Ok((tracker, subscription)) }) @@ -649,6 +655,7 @@ impl<C: Chain> Client<C> { "extrinsic".into(), subscription, sender, + cancel_receiver, )); Ok(tracker) } @@ -790,14 +797,16 @@ impl<C: Chain> Client<C> { Ok(FC::subscribe_justifications(&client).await?) }) .await?; + let (cancel_sender, cancel_receiver) = futures::channel::oneshot::channel(); let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY); self.data.read().await.tokio.spawn(Subscription::background_worker( C::NAME.into(), "justification".into(), subscription, sender, + cancel_receiver, )); - Ok(Subscription(Mutex::new(receiver))) + Ok(Subscription(Mutex::new(receiver), cancel_sender)) } /// Generates a proof of key ownership for the given authority in the given set. @@ -843,9 +852,17 @@ impl<C: Chain> Client<C> { impl<T: DeserializeOwned> Subscription<T> { /// Consumes subscription and returns future statuses stream. pub fn into_stream(self) -> impl futures::Stream<Item = T> { - futures::stream::unfold(self, |this| async { + futures::stream::unfold(Some(self), |mut this| async move { + let Some(this) = this.take() else { return None }; let item = this.0.lock().await.next().await.unwrap_or(None); - item.map(|i| (i, this)) + match item { + Some(item) => Some((item, Some(this))), + None => { + // let's make it explicit here + let _ = this.1.send(()); + None + }, + } }) } @@ -860,19 +877,35 @@ impl<T: DeserializeOwned> Subscription<T> { async fn background_worker( chain_name: String, item_type: String, - mut subscription: jsonrpsee::core::client::Subscription<T>, + subscription: jsonrpsee::core::client::Subscription<T>, mut sender: futures::channel::mpsc::Sender<Option<T>>, + cancel_receiver: futures::channel::oneshot::Receiver<()>, ) { + log::trace!( + target: "bridge", + "Starting background worker for {} {} subscription stream.", + chain_name, + item_type, + ); + + futures::pin_mut!(subscription, cancel_receiver); loop { - match subscription.next().await { - Some(Ok(item)) => + match futures::future::select(subscription.next(), &mut cancel_receiver).await { + futures::future::Either::Left((Some(Ok(item)), _)) => if sender.send(Some(item)).await.is_err() { + log::trace!( + target: "bridge", + "{} {} subscription stream: no listener. Stopping background worker.", + chain_name, + item_type, + ); + break }, - Some(Err(e)) => { + futures::future::Either::Left((Some(Err(e)), _)) => { log::trace!( target: "bridge", - "{} {} subscription stream has returned '{:?}'. Stream needs to be restarted.", + "{} {} subscription stream has returned '{:?}'. Stream needs to be restarted. Stopping background worker.", chain_name, item_type, e, @@ -880,16 +913,25 @@ impl<T: DeserializeOwned> Subscription<T> { let _ = sender.send(None).await; break }, - None => { + futures::future::Either::Left((None, _)) => { log::trace!( target: "bridge", - "{} {} subscription stream has returned None. Stream needs to be restarted.", + "{} {} subscription stream has returned None. Stream needs to be restarted. Stopping background worker.", chain_name, item_type, ); let _ = sender.send(None).await; break }, + futures::future::Either::Right((_, _)) => { + log::trace!( + target: "bridge", + "{} {} subscription stream: listener has been dropped. Stopping background worker.", + chain_name, + item_type, + ); + break; + }, } } } diff --git a/bridges/relays/client-substrate/src/transaction_tracker.rs b/bridges/relays/client-substrate/src/transaction_tracker.rs index 00375768c45c27c23dfccb7730668108a6fab788..b181a945c2c15393daf821901b298e81214f85e3 100644 --- a/bridges/relays/client-substrate/src/transaction_tracker.rs +++ b/bridges/relays/client-substrate/src/transaction_tracker.rs @@ -306,12 +306,13 @@ mod tests { TrackedTransactionStatus<HeaderIdOf<TestChain>>, InvalidationStatus<HeaderIdOf<TestChain>>, )> { + let (cancel_sender, _cancel_receiver) = futures::channel::oneshot::channel(); let (mut sender, receiver) = futures::channel::mpsc::channel(1); let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new( TestEnvironment(Ok(HeaderId(0, Default::default()))), Duration::from_secs(0), Default::default(), - Subscription(async_std::sync::Mutex::new(receiver)), + Subscription(async_std::sync::Mutex::new(receiver), cancel_sender), ); let wait_for_stall_timeout = futures::future::pending(); @@ -428,12 +429,13 @@ mod tests { #[async_std::test] async fn lost_on_timeout_when_waiting_for_invalidation_status() { + let (cancel_sender, _cancel_receiver) = futures::channel::oneshot::channel(); let (_sender, receiver) = futures::channel::mpsc::channel(1); let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new( TestEnvironment(Ok(HeaderId(0, Default::default()))), Duration::from_secs(0), Default::default(), - Subscription(async_std::sync::Mutex::new(receiver)), + Subscription(async_std::sync::Mutex::new(receiver), cancel_sender), ); let wait_for_stall_timeout = futures::future::ready(()).shared();