Unverified Commit 712112bc authored by Alexandru Vasile's avatar Alexandru Vasile
Browse files

Pass `SubscriptionSink` to subscription callbacks


Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
parent 64f45e93
......@@ -684,7 +684,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
) -> Result<MethodResourcesBuilder, Error>
where
Context: Send + Sync + 'static,
F: Fn(Params, PendingSubscription, Arc<Context>) -> SubscriptionResult + Send + Sync + 'static,
F: Fn(Params, SubscriptionSink, Arc<Context>) -> SubscriptionResult + Send + Sync + 'static,
{
if subscribe_method_name == unsubscribe_method_name {
return Err(Error::SubscriptionNameConflict(subscribe_method_name.into()));
......@@ -739,15 +739,16 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn, claimed| {
let sub_id: RpcSubscriptionId = conn.id_provider.next_id();
let sink = PendingSubscription(Some(InnerPendingSubscription {
sink: method_sink.clone(),
let sink = SubscriptionSink {
inner: method_sink.clone(),
close_notify: Some(conn.close_notify),
method: notif_method_name,
subscribers: subscribers.clone(),
unsubscribe: None,
uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
id: id.clone().into_owned(),
claimed,
}));
id: Some(id.clone().into_owned()),
_claimed: claimed,
};
// The callback returns an empty `SubscriptionError` for improved API ergonomics.
if let Err(err) = callback(params, sink, ctx.clone()) {
......@@ -1170,7 +1171,7 @@ impl SubscriptionSink {
/// Ok(())
/// });
/// ```
async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
pub async fn pipe_from_stream<S, T>(&mut self, stream: S) -> SubscriptionClosed
where
S: Stream<Item = T> + Unpin,
T: Serialize,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment