Skip to content
Snippets Groups Projects
Commit 090b55b7 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Offchain worker: Enable http2 and improve logging (#10305)

* Offchain worker: Enable http2 and improve logging

Apparently some webpages now return http2 by default and that silently breaks the offchain http
extension. The solution to this is to enable the `http2` feature of hyper. Besides that, this pr
improves the logging to make it easier to debug such errors.

* FMT

* Adds http2 test
parent 34bc2462
Branches
No related merge requests found
......@@ -2521,6 +2521,25 @@ dependencies = [
"web-sys",
]
[[package]]
name = "h2"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fd819562fcebdac5afc5c113c3ec36f902840b70fd4fc458799c8ce4607ae55"
dependencies = [
"bytes 1.0.1",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "half"
version = "1.7.1"
......@@ -2738,6 +2757,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"httparse",
......@@ -8349,7 +8369,6 @@ dependencies = [
"hyper 0.14.14",
"hyper-rustls",
"lazy_static",
"log 0.4.14",
"num_cpus",
"once_cell",
"parity-scale-codec",
......@@ -8371,6 +8390,7 @@ dependencies = [
"substrate-test-runtime-client",
"threadpool",
"tokio",
"tracing",
]
[[package]]
......
......@@ -19,7 +19,6 @@ hex = "0.4"
fnv = "1.0.6"
futures = "0.3.16"
futures-timer = "3.0.2"
log = "0.4.8"
num_cpus = "1.10"
parking_lot = "0.11.1"
rand = "0.7.2"
......@@ -31,9 +30,10 @@ sp-offchain = { version = "4.0.0-dev", path = "../../primitives/offchain" }
sp-runtime = { version = "4.0.0-dev", path = "../../primitives/runtime" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
threadpool = "1.7"
hyper = { version = "0.14.14", features = ["stream"] }
hyper = { version = "0.14.14", features = ["stream", "http2"] }
hyper-rustls = "0.22.1"
once_cell = "1.8"
tracing = "0.1.29"
[dev-dependencies]
sc-client-db = { version = "0.10.0-dev", default-features = true, path = "../db" }
......
......@@ -37,10 +37,11 @@ mod http;
mod timestamp;
fn unavailable_yet<R: Default>(name: &str) -> R {
log::error!(
target: "sc_offchain",
tracing::error!(
target: super::LOG_TARGET,
"The {:?} API is not available for offchain workers yet. Follow \
https://github.com/paritytech/substrate/issues/1458 for details", name
https://github.com/paritytech/substrate/issues/1458 for details",
name
);
Default::default()
}
......@@ -75,9 +76,12 @@ impl<Storage: OffchainStorage> Db<Storage> {
impl<Storage: OffchainStorage> offchain::DbExternalities for Db<Storage> {
fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]) {
log::debug!(
target: "sc_offchain",
"{:?}: Write: {:?} <= {:?}", kind, hex::encode(key), hex::encode(value)
tracing::debug!(
target: "offchain-worker::storage",
?kind,
key = ?hex::encode(key),
value = ?hex::encode(value),
"Write",
);
match kind {
StorageKind::PERSISTENT => self.persistent.set(STORAGE_PREFIX, key, value),
......@@ -86,9 +90,11 @@ impl<Storage: OffchainStorage> offchain::DbExternalities for Db<Storage> {
}
fn local_storage_clear(&mut self, kind: StorageKind, key: &[u8]) {
log::debug!(
target: "sc_offchain",
"{:?}: Clear: {:?}", kind, hex::encode(key)
tracing::debug!(
target: "offchain-worker::storage",
?kind,
key = ?hex::encode(key),
"Clear",
);
match kind {
StorageKind::PERSISTENT => self.persistent.remove(STORAGE_PREFIX, key),
......@@ -103,13 +109,13 @@ impl<Storage: OffchainStorage> offchain::DbExternalities for Db<Storage> {
old_value: Option<&[u8]>,
new_value: &[u8],
) -> bool {
log::debug!(
target: "sc_offchain",
"{:?}: CAS: {:?} <= {:?} vs {:?}",
kind,
hex::encode(key),
hex::encode(new_value),
old_value.as_ref().map(hex::encode),
tracing::debug!(
target: "offchain-worker::storage",
?kind,
key = ?hex::encode(key),
new_value = ?hex::encode(new_value),
old_value = ?old_value.as_ref().map(hex::encode),
"CAS",
);
match kind {
StorageKind::PERSISTENT =>
......@@ -123,12 +129,12 @@ impl<Storage: OffchainStorage> offchain::DbExternalities for Db<Storage> {
StorageKind::PERSISTENT => self.persistent.get(STORAGE_PREFIX, key),
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
};
log::debug!(
target: "sc_offchain",
"{:?}: Read: {:?} => {:?}",
kind,
hex::encode(key),
result.as_ref().map(hex::encode)
tracing::debug!(
target: "offchain-worker::storage",
?kind,
key = ?hex::encode(key),
result = ?result.as_ref().map(hex::encode),
"Read",
);
result
}
......
......@@ -33,7 +33,6 @@ use fnv::FnvHashMap;
use futures::{channel::mpsc, future, prelude::*};
use hyper::{client, Body, Client as HyperClient};
use hyper_rustls::HttpsConnector;
use log::error;
use once_cell::sync::Lazy;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Timestamp};
......@@ -46,6 +45,8 @@ use std::{
task::{Context, Poll},
};
const LOG_TARGET: &str = "offchain-worker::http";
/// Wrapper struct used for keeping the hyper_rustls client running.
#[derive(Clone)]
pub struct SharedClient(Arc<Lazy<HyperClient<HttpsConnector<client::HttpConnector>, Body>>>);
......@@ -146,13 +147,24 @@ impl HttpApi {
match self.next_id.0.checked_add(1) {
Some(new_id) => self.next_id.0 = new_id,
None => {
error!("Overflow in offchain worker HTTP request ID assignment");
tracing::error!(
target: LOG_TARGET,
"Overflow in offchain worker HTTP request ID assignment"
);
return Err(())
},
};
self.requests
.insert(new_id, HttpApiRequest::NotDispatched(request, body_sender));
tracing::error!(
target: LOG_TARGET,
id = %new_id.0,
%method,
%uri,
"Requested started",
);
Ok(new_id)
}
......@@ -168,11 +180,14 @@ impl HttpApi {
_ => return Err(()),
};
let name = hyper::header::HeaderName::try_from(name).map_err(drop)?;
let value = hyper::header::HeaderValue::try_from(value).map_err(drop)?;
let header_name = hyper::header::HeaderName::try_from(name).map_err(drop)?;
let header_value = hyper::header::HeaderValue::try_from(value).map_err(drop)?;
// Note that we're always appending headers and never replacing old values.
// We assume here that the user knows what they're doing.
request.headers_mut().append(name, value);
request.headers_mut().append(header_name, header_value);
tracing::debug!(target: LOG_TARGET, id = %request_id.0, %name, %value, "Added header to request");
Ok(())
}
......@@ -207,7 +222,7 @@ impl HttpApi {
sender.send_data(hyper::body::Bytes::from(chunk.to_owned())),
)
.map_err(|_| {
error!("HTTP sender refused data despite being ready");
tracing::error!(target: "offchain-worker::http", "HTTP sender refused data despite being ready");
HttpError::IoError
})
};
......@@ -215,6 +230,7 @@ impl HttpApi {
loop {
request = match request {
HttpApiRequest::NotDispatched(request, sender) => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Added new body chunk");
// If the request is not dispatched yet, dispatch it and loop again.
let _ = self
.to_worker
......@@ -225,14 +241,20 @@ impl HttpApi {
HttpApiRequest::Dispatched(Some(mut sender)) => {
if !chunk.is_empty() {
match poll_sender(&mut sender) {
Err(HttpError::IoError) => return Err(HttpError::IoError),
Err(HttpError::IoError) => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body");
return Err(HttpError::IoError)
},
other => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body");
self.requests
.insert(request_id, HttpApiRequest::Dispatched(Some(sender)));
return other
},
}
} else {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Finished writing body");
// Writing an empty body is a hint that we should stop writing. Dropping
// the sender.
self.requests.insert(request_id, HttpApiRequest::Dispatched(None));
......@@ -250,14 +272,20 @@ impl HttpApi {
.as_mut()
.expect("Can only enter this match branch if Some; qed"),
) {
Err(HttpError::IoError) => return Err(HttpError::IoError),
Err(HttpError::IoError) => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body");
return Err(HttpError::IoError)
},
other => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body");
self.requests
.insert(request_id, HttpApiRequest::Response(response));
return other
},
}
} else {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Finished writing body");
// Writing an empty body is a hint that we should stop writing. Dropping
// the sender.
self.requests.insert(
......@@ -271,13 +299,18 @@ impl HttpApi {
}
},
HttpApiRequest::Fail(_) =>
// If the request has already failed, return without putting back the request
// in the list.
return Err(HttpError::IoError),
HttpApiRequest::Fail(error) => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, ?error, "Request failed");
// If the request has already failed, return without putting back the request
// in the list.
return Err(HttpError::IoError)
},
v @ HttpApiRequest::Dispatched(None) |
v @ HttpApiRequest::Response(HttpApiRequestRp { sending_body: None, .. }) => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Body sending already finished");
// We have already finished sending this body.
self.requests.insert(request_id, v);
return Err(HttpError::Invalid)
......@@ -350,8 +383,19 @@ impl HttpApi {
// Requests in "fail" mode are purged before returning.
debug_assert_eq!(output.len(), ids.len());
for n in (0..ids.len()).rev() {
if let HttpRequestStatus::IoError = output[n] {
self.requests.remove(&ids[n]);
match output[n] {
HttpRequestStatus::IoError => {
self.requests.remove(&ids[n]);
},
HttpRequestStatus::Invalid => {
tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Unknown request");
},
HttpRequestStatus::DeadlineReached => {
tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Deadline reached");
},
HttpRequestStatus::Finished(_) => {
tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Request finished");
},
}
}
return output
......@@ -388,20 +432,23 @@ impl HttpApi {
);
},
None => {}, // can happen if we detected an IO error when sending the body
_ => error!("State mismatch between the API and worker"),
_ =>
tracing::error!(target: "offchain-worker::http", "State mismatch between the API and worker"),
}
},
Some(WorkerToApi::Fail { id, error }) => match self.requests.remove(&id) {
Some(HttpApiRequest::Dispatched(_)) => {
tracing::debug!(target: LOG_TARGET, id = %id.0, ?error, "Request failed");
self.requests.insert(id, HttpApiRequest::Fail(error));
},
None => {}, // can happen if we detected an IO error when sending the body
_ => error!("State mismatch between the API and worker"),
_ =>
tracing::error!(target: "offchain-worker::http", "State mismatch between the API and worker"),
},
None => {
error!("Worker has crashed");
tracing::error!(target: "offchain-worker::http", "Worker has crashed");
return ids.iter().map(|_| HttpRequestStatus::IoError).collect()
},
}
......@@ -474,7 +521,7 @@ impl HttpApi {
},
Err(err) => {
// This code should never be reached unless there's a logic error somewhere.
error!("Failed to read from current read chunk: {:?}", err);
tracing::error!(target: "offchain-worker::http", "Failed to read from current read chunk: {:?}", err);
return Err(HttpError::IoError)
},
}
......@@ -719,7 +766,10 @@ mod tests {
// Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP
// server that runs in the background as well.
macro_rules! build_api_server {
() => {{
() => {
build_api_server!(hyper::Response::new(hyper::Body::from("Hello World!")))
};
( $response:expr ) => {{
let hyper_client = SHARED_CLIENT.clone();
let (api, worker) = http(hyper_client.clone());
......@@ -736,9 +786,7 @@ mod tests {
// otherwise the tests are flaky.
let _ = req.into_body().collect::<Vec<_>>().await;
Ok::<_, Infallible>(hyper::Response::new(hyper::Body::from(
"Hello World!",
)))
Ok::<_, Infallible>($response)
},
))
}),
......@@ -776,6 +824,33 @@ mod tests {
assert_eq!(&buf[..n], b"Hello World!");
}
#[test]
fn basic_http2_localhost() {
let deadline = timestamp::now().add(Duration::from_millis(10_000));
// Performs an HTTP query to a background HTTP server.
let (mut api, addr) = build_api_server!(hyper::Response::builder()
.version(hyper::Version::HTTP_2)
.body(hyper::Body::from("Hello World!"))
.unwrap());
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.request_write_body(id, &[], Some(deadline)).unwrap();
match api.response_wait(&[id], Some(deadline))[0] {
HttpRequestStatus::Finished(200) => {},
v => panic!("Connecting to localhost failed: {:?}", v),
}
let headers = api.response_headers(id);
assert!(headers.iter().any(|(h, _)| h.eq_ignore_ascii_case(b"Date")));
let mut buf = vec![0; 2048];
let n = api.response_read_body(id, &mut buf, Some(deadline)).unwrap();
assert_eq!(&buf[..n], b"Hello World!");
}
#[test]
fn request_start_invalid_call() {
let (mut api, addr) = build_api_server!();
......
......@@ -41,7 +41,6 @@ use futures::{
future::{ready, Future},
prelude::*,
};
use log::{debug, warn};
use parking_lot::Mutex;
use sc_network::{ExHashT, NetworkService, NetworkStateInfo, PeerId};
use sp_api::{ApiExt, ProvideRuntimeApi};
......@@ -57,6 +56,8 @@ mod api;
pub use api::Db as OffchainDb;
pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};
const LOG_TARGET: &str = "offchain-worker";
/// NetworkProvider provides [`OffchainWorkers`] with all necessary hooks into the
/// underlying Substrate networking.
pub trait NetworkProvider: NetworkStateInfo {
......@@ -149,15 +150,25 @@ where
err => {
let help =
"Consider turning off offchain workers if they are not part of your runtime.";
log::error!("Unsupported Offchain Worker API version: {:?}. {}.", err, help);
tracing::error!(
target: LOG_TARGET,
"Unsupported Offchain Worker API version: {:?}. {}.",
err,
help
);
0
},
};
debug!("Checking offchain workers at {:?}: version:{}", at, version);
tracing::debug!(
target: LOG_TARGET,
"Checking offchain workers at {:?}: version:{}",
at,
version
);
let process = (version > 0).then(|| {
let (api, runner) =
api::AsyncApi::new(network_provider, is_validator, self.shared_http_client.clone());
debug!("Spawning offchain workers at {:?}", at);
tracing::debug!(target: LOG_TARGET, "Spawning offchain workers at {:?}", at);
let header = header.clone();
let client = self.client.clone();
......@@ -167,7 +178,7 @@ where
self.spawn_worker(move || {
let runtime = client.runtime_api();
let api = Box::new(api);
debug!("Running offchain workers at {:?}", at);
tracing::debug!(target: LOG_TARGET, "Running offchain workers at {:?}", at);
let context = ExecutionContext::OffchainCall(Some((api, capabilities)));
let run = if version == 2 {
......@@ -181,7 +192,12 @@ where
)
};
if let Err(e) = run {
log::error!("Error running offchain workers at {:?}: {:?}", at, e);
tracing::error!(
target: LOG_TARGET,
"Error running offchain workers at {:?}: {:?}",
at,
e
);
}
});
......@@ -232,8 +248,8 @@ pub async fn notification_future<Client, Block, Spawner>(
.boxed(),
);
} else {
log::debug!(
target: "sc_offchain",
tracing::debug!(
target: LOG_TARGET,
"Skipping offchain workers for non-canon block: {:?}",
n.header,
)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment