Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use core::fmt;
use futures_channel::mpsc;
use futures_util::sink::SinkExt;
use futures_util::stream::{SplitSink, SplitStream, StreamExt};
use gloo_net::websocket::{futures::WebSocket, Message, WebSocketError};
use jsonrpsee_core::async_trait;
use jsonrpsee_core::client::{TransportReceiverT, TransportSenderT};
/// Web-sys transport error that can occur.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Internal send error
#[error("Could not send message: {0}")]
SendError(#[from] mpsc::SendError),
/// Sender went away
#[error("Sender went away couldn't receive the message")]
SenderDisconnected,
/// Error that occurred in `JS context`.
#[error("JS Error: {0:?}")]
Js(String),
/// WebSocket error
#[error("WebSocket Error: {0:?}")]
WebSocket(WebSocketError),
}
/// Sender.
pub struct Sender(SplitSink<WebSocket, Message>);
impl fmt::Debug for Sender {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender").finish()
}
}
/// Receiver.
pub struct Receiver(SplitStream<WebSocket>);
impl fmt::Debug for Receiver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver").finish()
}
}
#[async_trait(?Send)]
impl TransportSenderT for Sender {
type Error = Error;
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
tracing::trace!("tx: {:?}", msg);
self.0.send(Message::Text(msg)).await.map_err(|e| Error::WebSocket(e))?;
Ok(())
}
async fn close(&mut self) -> Result<(), Error> {
Ok(())
}
}
#[async_trait(?Send)]
impl TransportReceiverT for Receiver {
type Error = Error;
async fn receive(&mut self) -> Result<String, Self::Error> {
match self.0.next().await {
Some(Ok(msg)) => {
tracing::trace!("rx: {:?}", msg);
let txt = match msg {
Message::Bytes(bytes) => String::from_utf8(bytes).expect("WebSocket message is valid utf8; qed"),
Message::Text(txt) => txt,
};
Ok(txt)
}
Some(Err(err)) => Err(Error::WebSocket(err)),
None => Err(Error::SenderDisconnected),
}
}
}
/// Create a transport sender & receiver pair.
pub async fn connect(url: impl AsRef<str>) -> Result<(Sender, Receiver), Error> {
let websocket = WebSocket::open(url.as_ref()).map_err(|e| Error::Js(e.to_string()))?;
let (write, read) = websocket.split();
Ok((Sender(write), Receiver(read)))
}