diff --git a/prdoc/pr_4533.prdoc b/prdoc/pr_4533.prdoc new file mode 100644 index 0000000000000000000000000000000000000000..a0835285fc012cb0018e499f87e380f7ac059999 --- /dev/null +++ b/prdoc/pr_4533.prdoc @@ -0,0 +1,10 @@ +title: "Fixed RPC subscriptions leak when subscription stream is finished" + +doc: + - audience: Node Operator + description: | + The node may leak RPC subscriptions in some cases, e.g. during + `author_submitAndWatchExtrinsic` calls. This PR fixes the issue. + +crates: + - name: sc-rpc diff --git a/substrate/client/rpc/src/utils.rs b/substrate/client/rpc/src/utils.rs index 6ec48efef846c719db4e009b3a14926db0577d5f..3b5372615e733dba860faa453063548189fc6354 100644 --- a/substrate/client/rpc/src/utils.rs +++ b/substrate/client/rpc/src/utils.rs @@ -143,7 +143,7 @@ async fn inner_pipe_from_stream<S, T>( // // Process remaining items and terminate. Either::Right((Either::Right((None, pending_fut)), _)) => { - if pending_fut.await.is_err() { + if !pending_fut.is_terminated() && pending_fut.await.is_err() { return; } @@ -231,4 +231,28 @@ mod tests { _ = rx.next().await.unwrap(); assert!(sub.next::<usize>().await.is_none()); } + + #[tokio::test] + async fn subscription_is_dropped_when_stream_is_empty() { + let notify_rx = std::sync::Arc::new(tokio::sync::Notify::new()); + let notify_tx = notify_rx.clone(); + + let mut module = RpcModule::new(notify_tx); + module + .register_subscription("sub", "my_sub", "unsub", |_, pending, notify_tx| async move { + // emulate empty stream for simplicity: otherwise we need some mechanism + // to sync buffer and channel send operations + let stream = futures::stream::empty::<()>(); + // this should exit immediately + pipe_from_stream(pending, stream).await; + // notify that the `pipe_from_stream` has returned + notify_tx.notify_one(); + Ok(()) + }) + .unwrap(); + module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap(); + + // it should fire once `pipe_from_stream` returns + notify_rx.notified().await; + } }