Unverified Commit 0e7d1154 authored by Niklas Adolfsson's avatar Niklas Adolfsson
Browse files

add back uncommented code

parent baf0e6bc
Pipeline #199387 passed with stages
in 7 minutes and 31 seconds
......@@ -870,7 +870,6 @@ impl PendingSubscription {
///
/// Fails if the connection was closed
pub async fn accept(mut self) -> Option<SubscriptionSink> {
tracing::trace!("[PendingSubscription] accept");
let inner = self.0.take()?;
let InnerPendingSubscription {
......@@ -889,31 +888,26 @@ impl PendingSubscription {
let success = response.success;
if subscribe_call.send(response).is_ok() && success {
tracing::trace!("[PendingSubscription] waiting for server to send the message");
let (tx, rx) = watch::channel(());
// mark the subscription is "active"
subscribers.lock().insert(uniq_sub.clone(), (sink.clone(), tx));
match message_sent.await {
Ok(_) => (),
// the connection was closed.
Err(_canceled) => {
subscribers.lock().remove(&uniq_sub);
return None;
}
if message_sent.await.is_ok() {
return Some(SubscriptionSink {
inner: sink,
close_notify,
method,
uniq_sub,
subscribers,
unsubscribe: rx,
_claimed: claimed,
});
} else {
subscribers.lock().remove(&uniq_sub);
}
return Some(SubscriptionSink {
inner: sink,
close_notify,
method,
uniq_sub,
subscribers,
unsubscribe: rx,
_claimed: claimed,
});
}
tracing::trace!("[PendingSubscription] call failed");
None
}
}
......
......@@ -730,9 +730,11 @@ where
}
if let Ok(batch) = serde_json::from_slice::<Vec<Notif>>(&data) {
if !batch.is_empty() {
return BatchResponse { result: "".to_string(), success: true };
}
return if !batch.is_empty() {
BatchResponse { result: "".to_string(), success: true }
} else {
BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
};
}
// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
......
......@@ -244,11 +244,11 @@ async fn proc_macros_generic_ws_client_api() {
assert_eq!(second_recv, "Response_B".to_string());
// Sub with params
/*let mut sub = client.sub_with_params(42).await.unwrap();
let mut sub = client.sub_with_params(42).await.unwrap();
let first_recv = sub.next().await.unwrap().unwrap();
assert_eq!(first_recv, 42);
let second_recv = sub.next().await.unwrap().unwrap();
assert_eq!(second_recv, 42);*/
assert_eq!(second_recv, 42);
}
#[tokio::test]
......
......@@ -846,7 +846,7 @@ where
if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&data) {
tracing::debug!("recv batch len={}", batch.len());
tracing::trace!("recv: batch={:?}", batch);
if !batch.is_empty() {
return if !batch.is_empty() {
let batch = batch.into_iter().map(|req| (req, call.clone()));
let batch_stream = futures_util::stream::iter(batch);
......@@ -869,7 +869,9 @@ where
.await;
return batch_response.finish();
}
} else {
BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
};
}
let (id, code) = prepare_error(&data);
......
......@@ -368,9 +368,9 @@ async fn garbage_request_fails() {
let response = client.send_request_text(req).await.unwrap();
assert_eq!(response, parse_error(Id::Null));
/*let req = r#"[]"#;
let req = r#"[]"#;
let response = client.send_request_text(req).await.unwrap();
assert_eq!(response, invalid_request(Id::Null));*/
assert_eq!(response, invalid_request(Id::Null));
let req = r#"[{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}"#;
let response = client.send_request_text(req).await.unwrap();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment