// Copyright 2019-2021 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any // person obtaining a copy of this software and associated // documentation files (the "Software"), to deal in the // Software without restriction, including without // limitation the rights to use, copy, modify, merge, // publish, distribute, sublicense, and/or sell copies of // the Software, and to permit persons to whom the Software // is furnished to do so, subject to the following // conditions: // // The above copyright notice and this permission notice // shall be included in all copies or substantial portions // of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use crate::server::rpc_module::MethodSink; use futures_channel::mpsc; use futures_util::stream::StreamExt; use jsonrpsee_types::error::{CallError, Error}; use jsonrpsee_types::to_json_raw_value; use jsonrpsee_types::v2::error::{OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG}; use jsonrpsee_types::v2::{ error::{CALL_EXECUTION_FAILED_CODE, UNKNOWN_ERROR_CODE}, ErrorCode, ErrorObject, Id, InvalidRequest, Response, RpcError, TwoPointZero, }; use serde::Serialize; use std::io; /// Bounded writer that allows writing at most `max_len` bytes. /// /// ``` /// use jsonrpsee_utils::server::helpers::BoundedWriter; /// use std::io::Write; /// /// let mut writer = BoundedWriter::new(10); /// (&mut writer).write("hello".as_bytes()).unwrap(); /// assert_eq!(std::str::from_utf8(&writer.into_bytes()).unwrap(), "hello"); /// ``` #[derive(Debug)] pub struct BoundedWriter { max_len: usize, buf: Vec, } impl BoundedWriter { /// Create a new bounded writer. pub fn new(max_len: usize) -> Self { Self { max_len, buf: Vec::with_capacity(128) } } /// Consume the writer and extract the written bytes. pub fn into_bytes(self) -> Vec { self.buf } } impl<'a> io::Write for &'a mut BoundedWriter { fn write(&mut self, buf: &[u8]) -> io::Result { let len = self.buf.len() + buf.len(); if self.max_len >= len { self.buf.extend_from_slice(buf); Ok(buf.len()) } else { Err(io::Error::new(io::ErrorKind::OutOfMemory, "Memory capacity exceeded")) } } fn flush(&mut self) -> io::Result<()> { Ok(()) } } /// Helper for sending JSON-RPC responses to the client pub fn send_response(id: Id, tx: &MethodSink, result: impl Serialize, max_response_size: u32) { let mut writer = BoundedWriter::new(max_response_size as usize); let json = match serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: id.clone(), result }) { Ok(_) => { // Safety - serde_json does not emit invalid UTF-8. unsafe { String::from_utf8_unchecked(writer.into_bytes()) } } Err(err) => { tracing::error!("Error serializing response: {:?}", err); if err.is_io() { let data = to_json_raw_value(&format!("Exceeded max limit {}", max_response_size)).ok(); let err = ErrorObject { code: ErrorCode::ServerError(OVERSIZED_RESPONSE_CODE), message: OVERSIZED_RESPONSE_MSG.into(), data: data.as_deref(), }; return send_error(id, tx, err); } else { return send_error(id, tx, ErrorCode::InternalError.into()); } } }; if let Err(err) = tx.unbounded_send(json) { tracing::error!("Error sending response to the client: {:?}", err) } } /// Helper for sending JSON-RPC errors to the client pub fn send_error(id: Id, tx: &MethodSink, error: ErrorObject) { let json = match serde_json::to_string(&RpcError { jsonrpc: TwoPointZero, error, id }) { Ok(json) => json, Err(err) => { tracing::error!("Error serializing error message: {:?}", err); return; } }; if let Err(err) = tx.unbounded_send(json) { tracing::error!("Could not send error response to the client: {:?}", err) } } /// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client pub fn send_call_error(id: Id, tx: &MethodSink, err: Error) { let (code, message, data) = match err { Error::Call(CallError::InvalidParams(e)) => (ErrorCode::InvalidParams, e.to_string(), None), Error::Call(CallError::Failed(e)) => (ErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), e.to_string(), None), Error::Call(CallError::Custom { code, message, data }) => (code.into(), message, data), // This should normally not happen because the most common use case is to // return `Error::Call` in `register_async_method`. e => (ErrorCode::ServerError(UNKNOWN_ERROR_CODE), e.to_string(), None), }; let err = ErrorObject { code, message: message.into(), data: data.as_deref() }; send_error(id, tx, err) } /// Figure out if this is a sufficiently complete request that we can extract an [`Id`] out of, or just plain /// unparseable garbage. pub fn prepare_error(data: &[u8]) -> (Id<'_>, ErrorCode) { match serde_json::from_slice::(data) { Ok(InvalidRequest { id }) => (id, ErrorCode::InvalidRequest), Err(_) => (Id::Null, ErrorCode::ParseError), } } /// Read all the results of all method calls in a batch request from the ['Stream']. Format the result into a single /// `String` appropriately wrapped in `[`/`]`. pub async fn collect_batch_response(rx: mpsc::UnboundedReceiver) -> String { let mut buf = String::with_capacity(2048); buf.push('['); let mut buf = rx .fold(buf, |mut acc, response| async move { acc.push_str(&response); acc.push(','); acc }) .await; // Remove trailing comma buf.pop(); buf.push(']'); buf } #[cfg(test)] mod tests { use super::{BoundedWriter, Id, Response, TwoPointZero}; #[test] fn bounded_serializer_work() { let mut writer = BoundedWriter::new(100); let result = "success"; assert!( serde_json::to_writer(&mut writer, &Response { jsonrpc: TwoPointZero, id: Id::Number(1), result }).is_ok() ); assert_eq!(String::from_utf8(writer.into_bytes()).unwrap(), r#"{"jsonrpc":"2.0","result":"success","id":1}"#); } #[test] fn bounded_serializer_cap_works() { let mut writer = BoundedWriter::new(100); // NOTE: `"` is part of the serialization so 101 characters. assert!(serde_json::to_writer(&mut writer, &"x".repeat(99)).is_err()); } }