From d54feeb101b3779422323224c8e1ac43d3a1fafa Mon Sep 17 00:00:00 2001
From: Svyatoslav Nikolsky <svyatonik@gmail.com>
Date: Tue, 21 May 2024 13:41:49 +0300
Subject: [PATCH] Fixed RPC subscriptions leak when subscription stream is
 finished (#4533)

closes https://github.com/paritytech/parity-bridges-common/issues/3000

Recently we've changed our bridge configuration for Rococo <> Westend
and our new relayer has started to submit transactions every ~ `30`
seconds. Eventually, it switches itself into limbo state, where it can't
submit more transactions - all `author_submitAndWatchExtrinsic` calls
are failing with the following error: `ERROR bridge Failed to send
transaction to BridgeHubRococo node: Call(ErrorObject { code:
ServerError(-32006), message: "Too many subscriptions on the
connection", data: Some(RawValue("Exceeded max limit of 1024")) })`.

Some links for those who want to explore:
- server side (node) has a strict limit on a number of active
subscriptions. It fails to open a new subscription if this limit is hit:
https://github.com/paritytech/jsonrpsee/blob/a4533966b997e83632509ad97eea010fc7c3efc0/server/src/middleware/rpc/layer/rpc_service.rs#L122-L132.
The limit is set to `1024` by default;
- internally this limit is a semaphore with `limit` permits:
https://github.com/paritytech/jsonrpsee/blob/a4533966b997e83632509ad97eea010fc7c3efc0/core/src/server/subscription.rs#L461-L485;
- semaphore permit is acquired in the first link;
- the permit is "returned" when the `SubscriptionSink` is dropped:
https://github.com/paritytech/jsonrpsee/blob/a4533966b997e83632509ad97eea010fc7c3efc0/core/src/server/subscription.rs#L310-L325;
- the `SubscriptionSink` is dropped when [this `polkadot-sdk`
function](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L58-L94)
returns. In other words - when the connection is closed, the stream is
finished or internal subscription buffer limit is hit;
- the subscription has the internal buffer, so sending an item contains
of two steps: [reading an item from the underlying
stream](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L125-L141)
and [sending it over the
connection](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L111-L116);
- when the underlying stream is finished, the `inner_pipe_from_stream`
wants to ensure that all items are sent to the subscriber. So it: [waits
until the current send operation
completes](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L146-L148)
and then [send all remaining items from the internal
buffer](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L150-L155).
Once it is done, the function returns, the `SubscriptionSink` is
dropped, semaphore permit is dropped and we are ready to accept new
subscriptions;
- unfortunately, the code just calls the `pending_fut.await.is_err()` to
ensure that [the current send operation
completes](https://github.com/paritytech/polkadot-sdk/blob/278486f9bf7db06c174203f098eec2f91839757a/substrate/client/rpc/src/utils.rs#L146-L148).
But if there are no current send operation (which is normal), then the
`pending_fut` is set to terminated future and the `await` never
completes. Hence, no return from the function, no drop of
`SubscriptionSink`, no drop of semaphore permit, no new subscriptions
allowed (once number of susbcriptions hits the limit.

I've illustrated the issue with small test - you may ensure that if e.g.
the stream is initially empty, the
`subscription_is_dropped_when_stream_is_empty` will hang because
`pipe_from_stream` never exits.
---
 prdoc/pr_4533.prdoc               | 10 ++++++++++
 substrate/client/rpc/src/utils.rs | 26 +++++++++++++++++++++++++-
 2 files changed, 35 insertions(+), 1 deletion(-)
 create mode 100644 prdoc/pr_4533.prdoc

diff --git a/prdoc/pr_4533.prdoc b/prdoc/pr_4533.prdoc
new file mode 100644
index 00000000000..a0835285fc0
--- /dev/null
+++ b/prdoc/pr_4533.prdoc
@@ -0,0 +1,10 @@
+title: "Fixed RPC subscriptions leak when subscription stream is finished"
+
+doc:
+  - audience: Node Operator
+    description: |
+      The node may leak RPC subscriptions in some cases, e.g. during
+      `author_submitAndWatchExtrinsic` calls. This PR fixes the issue.
+
+crates:
+  - name: sc-rpc
diff --git a/substrate/client/rpc/src/utils.rs b/substrate/client/rpc/src/utils.rs
index 6ec48efef84..3b5372615e7 100644
--- a/substrate/client/rpc/src/utils.rs
+++ b/substrate/client/rpc/src/utils.rs
@@ -143,7 +143,7 @@ async fn inner_pipe_from_stream<S, T>(
 			//
 			// Process remaining items and terminate.
 			Either::Right((Either::Right((None, pending_fut)), _)) => {
-				if pending_fut.await.is_err() {
+				if !pending_fut.is_terminated() && pending_fut.await.is_err() {
 					return;
 				}
 
@@ -231,4 +231,28 @@ mod tests {
 		_ = rx.next().await.unwrap();
 		assert!(sub.next::<usize>().await.is_none());
 	}
+
+	#[tokio::test]
+	async fn subscription_is_dropped_when_stream_is_empty() {
+		let notify_rx = std::sync::Arc::new(tokio::sync::Notify::new());
+		let notify_tx = notify_rx.clone();
+
+		let mut module = RpcModule::new(notify_tx);
+		module
+			.register_subscription("sub", "my_sub", "unsub", |_, pending, notify_tx| async move {
+				// emulate empty stream for simplicity: otherwise we need some mechanism
+				// to sync buffer and channel send operations
+				let stream = futures::stream::empty::<()>();
+				// this should exit immediately
+				pipe_from_stream(pending, stream).await;
+				// notify that the `pipe_from_stream` has returned
+				notify_tx.notify_one();
+				Ok(())
+			})
+			.unwrap();
+		module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
+
+		// it should fire once `pipe_from_stream` returns
+		notify_rx.notified().await;
+	}
 }
-- 
GitLab