diff --git a/substrate/core/offchain/src/api/http.rs b/substrate/core/offchain/src/api/http.rs index 287e3f99be44b3a942c0725081fcd50128e40ee1..9ea666862c3d9ad47d4e954d7bce5700e7f28e1d 100644 --- a/substrate/core/offchain/src/api/http.rs +++ b/substrate/core/offchain/src/api/http.rs @@ -84,7 +84,9 @@ enum HttpApiRequest { Dispatched(Option<hyper::body::Sender>), /// Received a response. Response(HttpApiRequestRp), - /// A request has been dispatched then produced an error. + /// A request has been dispatched but the worker notified us of an error. We report this + /// failure to the user as an `IoError` and remove the request from the list as soon as + /// possible. Fail(hyper::Error), } @@ -100,6 +102,9 @@ struct HttpApiRequestRp { /// Body of the response, as a channel of `Chunk` objects. /// While the code is designed to drop the `Receiver` once it ends, we wrap it within a /// `Fuse` in order to be extra precautious about panics. + /// Elements extracted from the channel are first put into `current_read_chunk`. + /// If the channel produces an error, then that is translated into an `IoError` and the request + /// is removed from the list. body: stream::Fuse<mpsc::Receiver<Result<hyper::Chunk, hyper::Error>>>, /// Chunk that has been extracted from the channel and that is currently being read. /// Reading data from the response should read from this field in priority. @@ -170,8 +175,9 @@ impl HttpApi { }; let mut deadline = timestamp::deadline_to_future(deadline); - // Closure that writes data to a sender, taking the deadline into account. - // If `IoError` is returned, don't forget to destroy the request. + // Closure that writes data to a sender, taking the deadline into account. Can return `Ok` + // (if the body has been written), or `DeadlineReached`, or `IoError`. + // If `IoError` is returned, don't forget to remove the request from the list. let mut poll_sender = move |sender: &mut hyper::body::Sender| -> Result<(), HttpError> { let mut when_ready = future::maybe_done(Compat01As03::new( futures01::future::poll_fn(|| sender.poll_ready()) @@ -220,7 +226,8 @@ impl HttpApi { } } } else { - // Dropping the sender to finish writing. + // Writing an empty body is a hint that we should stop writing. Dropping + // the sender. self.requests.insert(request_id, HttpApiRequest::Dispatched(None)); return Ok(()) } @@ -237,7 +244,8 @@ impl HttpApi { } } else { - // Dropping the sender to finish writing. + // Writing an empty body is a hint that we should stop writing. Dropping + // the sender. self.requests.insert(request_id, HttpApiRequest::Response(HttpApiRequestRp { sending_body: None, ..response @@ -297,6 +305,7 @@ impl HttpApi { loop { // Within that loop, first try to see if we have all the elements for a response. + // This includes the situation where the deadline is reached. { let mut output = Vec::with_capacity(ids.len()); let mut must_wait_more = false; @@ -337,7 +346,8 @@ impl HttpApi { } } - // Grab next message, or call `continue` if deadline is reached. + // Grab next message from the worker. We call `continue` if deadline is reached so that + // we loop back and `return`. let next_message = { let mut next_msg = future::maybe_done(self.from_worker.next()); futures::executor::block_on(future::select(&mut next_msg, &mut deadline)); @@ -410,7 +420,7 @@ impl HttpApi { buffer: &mut [u8], deadline: Option<Timestamp> ) -> Result<usize, HttpError> { - // Do an implicit non-blocking wait on the request. + // Do an implicit wait on the request. let _ = self.response_wait(&[request_id], deadline); // Remove the request from the list and handle situations where the request is invalid or @@ -419,12 +429,15 @@ impl HttpApi { Some(HttpApiRequest::Response(r)) => r, // Because we called `response_wait` above, we know that the deadline has been reached // and we still haven't received a response. - Some(HttpApiRequest::Dispatched(_)) => return Err(HttpError::DeadlineReached), + Some(rq @ HttpApiRequest::Dispatched(_)) => { + self.requests.insert(request_id, rq); + return Err(HttpError::DeadlineReached) + }, // The request has failed. Some(HttpApiRequest::Fail { .. }) => return Err(HttpError::IoError), // Request hasn't been dispatched yet; reading the body is invalid. - Some(rq) => { + Some(rq @ HttpApiRequest::NotDispatched(_, _)) => { self.requests.insert(request_id, rq); return Err(HttpError::Invalid) } @@ -432,12 +445,7 @@ impl HttpApi { }; // Convert the deadline into a `Future` that resolves when the deadline is reached. - let mut deadline = future::maybe_done(match deadline { - Some(deadline) => future::Either::Left( - futures_timer::Delay::new(timestamp::timestamp_from_now(deadline)) - ), - None => future::Either::Right(future::pending()) - }); + let mut deadline = timestamp::deadline_to_future(deadline); loop { // First read from `current_read_chunk`. @@ -530,9 +538,11 @@ enum WorkerToApi { /// because we don't want the `HttpApi` to have to drive the reading. /// Instead, reading an item from the channel will notify the worker task, which will push /// the next item. + /// Can also be used to send an error, in case an error happend on the HTTP socket. After + /// an error is sent, the channel will close. body: mpsc::Receiver<Result<hyper::Chunk, hyper::Error>>, }, - /// A request has failed because of an error. + /// A request has failed because of an error. The request is then no longer valid. Fail { /// The ID that was passed to the worker. id: HttpRequestId, @@ -541,16 +551,19 @@ enum WorkerToApi { }, } +/// Wraps around a `hyper::Client` with either TLS enabled or disabled. enum HyperClient { - Http(hyper::Client<hyper::client::HttpConnector, hyper::Body>), + /// Everything is ok and HTTPS is available. Https(hyper::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>), + /// We failed to initialize HTTPS and therefore only allow HTTP. + Http(hyper::Client<hyper::client::HttpConnector, hyper::Body>), } impl HyperClient { /// Creates new hyper client. /// /// By default we will try to initialize the `HttpsConnector`, - /// if that's not possible we'll fall back to `HttpConnector`. + /// If that's not possible we'll fall back to `HttpConnector`. pub fn new() -> Self { match hyper_tls::HttpsConnector::new(1) { Ok(tls) => HyperClient::Https(hyper::Client::builder().build(tls)), @@ -576,13 +589,13 @@ pub struct HttpWorker { /// HTTP request being processed by the worker. enum HttpWorkerRequest { - /// Request has been dispatched and is waiting for a response. + /// Request has been dispatched and is waiting for a response from the Internet. Dispatched(Compat01As03<hyper::client::ResponseFuture>), - /// Reading the body of the response and sending it to the channel. + /// Progressively reading the body of the response and sending it to the channel. ReadBody { - /// Body to read `Chunk`s from. + /// Body to read `Chunk`s from. Only used if the channel is ready to accept data. body: Compat01As03<hyper::Body>, - /// Where to send the chunks. + /// Channel to the [`HttpApi`] where we send the chunks to. tx: mpsc::Sender<Result<hyper::Chunk, hyper::Error>>, }, } @@ -632,7 +645,7 @@ impl Future for HttpWorker { }); me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx: body_tx })); - cx.waker().wake_by_ref(); // wake up in order to poll the new future + cx.waker().wake_by_ref(); // reschedule in order to poll the new future continue } @@ -648,11 +661,12 @@ impl Future for HttpWorker { } } + // `tx` is ready. Read a chunk from the socket and send it to the channel. match Stream::poll_next(Pin::new(&mut body), cx) { Poll::Ready(Some(Ok(chunk))) => { let _ = tx.start_send(Ok(chunk)); me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx })); - cx.waker().wake_by_ref(); // notify in order to poll again + cx.waker().wake_by_ref(); // reschedule in order to continue reading } Poll::Ready(Some(Err(err))) => { let _ = tx.start_send(Err(err)); diff --git a/substrate/core/offchain/src/api/timestamp.rs b/substrate/core/offchain/src/api/timestamp.rs index f106ac72738cdf59413d75995dde00d73eb06f3c..445c7f3878474aef80646351871ea417b959ae67 100644 --- a/substrate/core/offchain/src/api/timestamp.rs +++ b/substrate/core/offchain/src/api/timestamp.rs @@ -46,6 +46,8 @@ pub fn timestamp_from_now(timestamp: Timestamp) -> Duration { } /// Converts the deadline into a `Future` that resolves when the deadline is reached. +/// +/// If `None`, returns a never-ending `Future`. pub fn deadline_to_future( deadline: Option<Timestamp>, ) -> futures::future::MaybeDone<impl futures::Future> {