Skip to content
Snippets Groups Projects
Commit b5c6cc39 authored by Pierre Krieger's avatar Pierre Krieger Committed by Bastian Köcher
Browse files

Some tweaks to offchain HTTP workers (#3488)

parent 36ef4c06
No related merge requests found
......@@ -84,7 +84,9 @@ enum HttpApiRequest {
Dispatched(Option<hyper::body::Sender>),
/// Received a response.
Response(HttpApiRequestRp),
/// A request has been dispatched then produced an error.
/// A request has been dispatched but the worker notified us of an error. We report this
/// failure to the user as an `IoError` and remove the request from the list as soon as
/// possible.
Fail(hyper::Error),
}
......@@ -100,6 +102,9 @@ struct HttpApiRequestRp {
/// Body of the response, as a channel of `Chunk` objects.
/// While the code is designed to drop the `Receiver` once it ends, we wrap it within a
/// `Fuse` in order to be extra precautious about panics.
/// Elements extracted from the channel are first put into `current_read_chunk`.
/// If the channel produces an error, then that is translated into an `IoError` and the request
/// is removed from the list.
body: stream::Fuse<mpsc::Receiver<Result<hyper::Chunk, hyper::Error>>>,
/// Chunk that has been extracted from the channel and that is currently being read.
/// Reading data from the response should read from this field in priority.
......@@ -170,8 +175,9 @@ impl HttpApi {
};
let mut deadline = timestamp::deadline_to_future(deadline);
// Closure that writes data to a sender, taking the deadline into account.
// If `IoError` is returned, don't forget to destroy the request.
// Closure that writes data to a sender, taking the deadline into account. Can return `Ok`
// (if the body has been written), or `DeadlineReached`, or `IoError`.
// If `IoError` is returned, don't forget to remove the request from the list.
let mut poll_sender = move |sender: &mut hyper::body::Sender| -> Result<(), HttpError> {
let mut when_ready = future::maybe_done(Compat01As03::new(
futures01::future::poll_fn(|| sender.poll_ready())
......@@ -220,7 +226,8 @@ impl HttpApi {
}
}
} else {
// Dropping the sender to finish writing.
// Writing an empty body is a hint that we should stop writing. Dropping
// the sender.
self.requests.insert(request_id, HttpApiRequest::Dispatched(None));
return Ok(())
}
......@@ -237,7 +244,8 @@ impl HttpApi {
}
} else {
// Dropping the sender to finish writing.
// Writing an empty body is a hint that we should stop writing. Dropping
// the sender.
self.requests.insert(request_id, HttpApiRequest::Response(HttpApiRequestRp {
sending_body: None,
..response
......@@ -297,6 +305,7 @@ impl HttpApi {
loop {
// Within that loop, first try to see if we have all the elements for a response.
// This includes the situation where the deadline is reached.
{
let mut output = Vec::with_capacity(ids.len());
let mut must_wait_more = false;
......@@ -337,7 +346,8 @@ impl HttpApi {
}
}
// Grab next message, or call `continue` if deadline is reached.
// Grab next message from the worker. We call `continue` if deadline is reached so that
// we loop back and `return`.
let next_message = {
let mut next_msg = future::maybe_done(self.from_worker.next());
futures::executor::block_on(future::select(&mut next_msg, &mut deadline));
......@@ -410,7 +420,7 @@ impl HttpApi {
buffer: &mut [u8],
deadline: Option<Timestamp>
) -> Result<usize, HttpError> {
// Do an implicit non-blocking wait on the request.
// Do an implicit wait on the request.
let _ = self.response_wait(&[request_id], deadline);
// Remove the request from the list and handle situations where the request is invalid or
......@@ -419,12 +429,15 @@ impl HttpApi {
Some(HttpApiRequest::Response(r)) => r,
// Because we called `response_wait` above, we know that the deadline has been reached
// and we still haven't received a response.
Some(HttpApiRequest::Dispatched(_)) => return Err(HttpError::DeadlineReached),
Some(rq @ HttpApiRequest::Dispatched(_)) => {
self.requests.insert(request_id, rq);
return Err(HttpError::DeadlineReached)
},
// The request has failed.
Some(HttpApiRequest::Fail { .. }) =>
return Err(HttpError::IoError),
// Request hasn't been dispatched yet; reading the body is invalid.
Some(rq) => {
Some(rq @ HttpApiRequest::NotDispatched(_, _)) => {
self.requests.insert(request_id, rq);
return Err(HttpError::Invalid)
}
......@@ -432,12 +445,7 @@ impl HttpApi {
};
// Convert the deadline into a `Future` that resolves when the deadline is reached.
let mut deadline = future::maybe_done(match deadline {
Some(deadline) => future::Either::Left(
futures_timer::Delay::new(timestamp::timestamp_from_now(deadline))
),
None => future::Either::Right(future::pending())
});
let mut deadline = timestamp::deadline_to_future(deadline);
loop {
// First read from `current_read_chunk`.
......@@ -530,9 +538,11 @@ enum WorkerToApi {
/// because we don't want the `HttpApi` to have to drive the reading.
/// Instead, reading an item from the channel will notify the worker task, which will push
/// the next item.
/// Can also be used to send an error, in case an error happend on the HTTP socket. After
/// an error is sent, the channel will close.
body: mpsc::Receiver<Result<hyper::Chunk, hyper::Error>>,
},
/// A request has failed because of an error.
/// A request has failed because of an error. The request is then no longer valid.
Fail {
/// The ID that was passed to the worker.
id: HttpRequestId,
......@@ -541,16 +551,19 @@ enum WorkerToApi {
},
}
/// Wraps around a `hyper::Client` with either TLS enabled or disabled.
enum HyperClient {
Http(hyper::Client<hyper::client::HttpConnector, hyper::Body>),
/// Everything is ok and HTTPS is available.
Https(hyper::Client<hyper_tls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>),
/// We failed to initialize HTTPS and therefore only allow HTTP.
Http(hyper::Client<hyper::client::HttpConnector, hyper::Body>),
}
impl HyperClient {
/// Creates new hyper client.
///
/// By default we will try to initialize the `HttpsConnector`,
/// if that's not possible we'll fall back to `HttpConnector`.
/// If that's not possible we'll fall back to `HttpConnector`.
pub fn new() -> Self {
match hyper_tls::HttpsConnector::new(1) {
Ok(tls) => HyperClient::Https(hyper::Client::builder().build(tls)),
......@@ -576,13 +589,13 @@ pub struct HttpWorker {
/// HTTP request being processed by the worker.
enum HttpWorkerRequest {
/// Request has been dispatched and is waiting for a response.
/// Request has been dispatched and is waiting for a response from the Internet.
Dispatched(Compat01As03<hyper::client::ResponseFuture>),
/// Reading the body of the response and sending it to the channel.
/// Progressively reading the body of the response and sending it to the channel.
ReadBody {
/// Body to read `Chunk`s from.
/// Body to read `Chunk`s from. Only used if the channel is ready to accept data.
body: Compat01As03<hyper::Body>,
/// Where to send the chunks.
/// Channel to the [`HttpApi`] where we send the chunks to.
tx: mpsc::Sender<Result<hyper::Chunk, hyper::Error>>,
},
}
......@@ -632,7 +645,7 @@ impl Future for HttpWorker {
});
me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx: body_tx }));
cx.waker().wake_by_ref(); // wake up in order to poll the new future
cx.waker().wake_by_ref(); // reschedule in order to poll the new future
continue
}
......@@ -648,11 +661,12 @@ impl Future for HttpWorker {
}
}
// `tx` is ready. Read a chunk from the socket and send it to the channel.
match Stream::poll_next(Pin::new(&mut body), cx) {
Poll::Ready(Some(Ok(chunk))) => {
let _ = tx.start_send(Ok(chunk));
me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx }));
cx.waker().wake_by_ref(); // notify in order to poll again
cx.waker().wake_by_ref(); // reschedule in order to continue reading
}
Poll::Ready(Some(Err(err))) => {
let _ = tx.start_send(Err(err));
......
......@@ -46,6 +46,8 @@ pub fn timestamp_from_now(timestamp: Timestamp) -> Duration {
}
/// Converts the deadline into a `Future` that resolves when the deadline is reached.
///
/// If `None`, returns a never-ending `Future`.
pub fn deadline_to_future(
deadline: Option<Timestamp>,
) -> futures::future::MaybeDone<impl futures::Future> {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment