diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 32234cc4c93a6ab34b7ade6c8e96364d5b5346ab..60f8217d69e8a04832156c1301654ed641d79544 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2521,6 +2521,25 @@ dependencies = [ "web-sys", ] +[[package]] +name = "h2" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fd819562fcebdac5afc5c113c3ec36f902840b70fd4fc458799c8ce4607ae55" +dependencies = [ + "bytes 1.0.1", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "1.7.1" @@ -2738,6 +2757,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", + "h2", "http", "http-body", "httparse", @@ -8349,7 +8369,6 @@ dependencies = [ "hyper 0.14.14", "hyper-rustls", "lazy_static", - "log 0.4.14", "num_cpus", "once_cell", "parity-scale-codec", @@ -8371,6 +8390,7 @@ dependencies = [ "substrate-test-runtime-client", "threadpool", "tokio", + "tracing", ] [[package]] diff --git a/substrate/client/offchain/Cargo.toml b/substrate/client/offchain/Cargo.toml index 2a6fdddd7ad36b300566bbac25bca2bb80127f53..b92ee7041e5fde33352a221c66fb219b6760be51 100644 --- a/substrate/client/offchain/Cargo.toml +++ b/substrate/client/offchain/Cargo.toml @@ -19,7 +19,6 @@ hex = "0.4" fnv = "1.0.6" futures = "0.3.16" futures-timer = "3.0.2" -log = "0.4.8" num_cpus = "1.10" parking_lot = "0.11.1" rand = "0.7.2" @@ -31,9 +30,10 @@ sp-offchain = { version = "4.0.0-dev", path = "../../primitives/offchain" } sp-runtime = { version = "4.0.0-dev", path = "../../primitives/runtime" } sc-utils = { version = "4.0.0-dev", path = "../utils" } threadpool = "1.7" -hyper = { version = "0.14.14", features = ["stream"] } +hyper = { version = "0.14.14", features = ["stream", "http2"] } hyper-rustls = "0.22.1" once_cell = "1.8" +tracing = "0.1.29" [dev-dependencies] sc-client-db = { version = "0.10.0-dev", default-features = true, path = "../db" } diff --git a/substrate/client/offchain/src/api.rs b/substrate/client/offchain/src/api.rs index 07136d1815b91932a2ef99f783bd09f0bdff08fa..c2830510b015c4052339bf20089b97dda37f662d 100644 --- a/substrate/client/offchain/src/api.rs +++ b/substrate/client/offchain/src/api.rs @@ -37,10 +37,11 @@ mod http; mod timestamp; fn unavailable_yet<R: Default>(name: &str) -> R { - log::error!( - target: "sc_offchain", + tracing::error!( + target: super::LOG_TARGET, "The {:?} API is not available for offchain workers yet. Follow \ - https://github.com/paritytech/substrate/issues/1458 for details", name + https://github.com/paritytech/substrate/issues/1458 for details", + name ); Default::default() } @@ -75,9 +76,12 @@ impl<Storage: OffchainStorage> Db<Storage> { impl<Storage: OffchainStorage> offchain::DbExternalities for Db<Storage> { fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]) { - log::debug!( - target: "sc_offchain", - "{:?}: Write: {:?} <= {:?}", kind, hex::encode(key), hex::encode(value) + tracing::debug!( + target: "offchain-worker::storage", + ?kind, + key = ?hex::encode(key), + value = ?hex::encode(value), + "Write", ); match kind { StorageKind::PERSISTENT => self.persistent.set(STORAGE_PREFIX, key, value), @@ -86,9 +90,11 @@ impl<Storage: OffchainStorage> offchain::DbExternalities for Db<Storage> { } fn local_storage_clear(&mut self, kind: StorageKind, key: &[u8]) { - log::debug!( - target: "sc_offchain", - "{:?}: Clear: {:?}", kind, hex::encode(key) + tracing::debug!( + target: "offchain-worker::storage", + ?kind, + key = ?hex::encode(key), + "Clear", ); match kind { StorageKind::PERSISTENT => self.persistent.remove(STORAGE_PREFIX, key), @@ -103,13 +109,13 @@ impl<Storage: OffchainStorage> offchain::DbExternalities for Db<Storage> { old_value: Option<&[u8]>, new_value: &[u8], ) -> bool { - log::debug!( - target: "sc_offchain", - "{:?}: CAS: {:?} <= {:?} vs {:?}", - kind, - hex::encode(key), - hex::encode(new_value), - old_value.as_ref().map(hex::encode), + tracing::debug!( + target: "offchain-worker::storage", + ?kind, + key = ?hex::encode(key), + new_value = ?hex::encode(new_value), + old_value = ?old_value.as_ref().map(hex::encode), + "CAS", ); match kind { StorageKind::PERSISTENT => @@ -123,12 +129,12 @@ impl<Storage: OffchainStorage> offchain::DbExternalities for Db<Storage> { StorageKind::PERSISTENT => self.persistent.get(STORAGE_PREFIX, key), StorageKind::LOCAL => unavailable_yet(LOCAL_DB), }; - log::debug!( - target: "sc_offchain", - "{:?}: Read: {:?} => {:?}", - kind, - hex::encode(key), - result.as_ref().map(hex::encode) + tracing::debug!( + target: "offchain-worker::storage", + ?kind, + key = ?hex::encode(key), + result = ?result.as_ref().map(hex::encode), + "Read", ); result } diff --git a/substrate/client/offchain/src/api/http.rs b/substrate/client/offchain/src/api/http.rs index a2975bad16528a769d3ee5e73c997205dc6ef566..632c94b4810748839045ce91d14d7e1af2dc1e88 100644 --- a/substrate/client/offchain/src/api/http.rs +++ b/substrate/client/offchain/src/api/http.rs @@ -33,7 +33,6 @@ use fnv::FnvHashMap; use futures::{channel::mpsc, future, prelude::*}; use hyper::{client, Body, Client as HyperClient}; use hyper_rustls::HttpsConnector; -use log::error; use once_cell::sync::Lazy; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Timestamp}; @@ -46,6 +45,8 @@ use std::{ task::{Context, Poll}, }; +const LOG_TARGET: &str = "offchain-worker::http"; + /// Wrapper struct used for keeping the hyper_rustls client running. #[derive(Clone)] pub struct SharedClient(Arc<Lazy<HyperClient<HttpsConnector<client::HttpConnector>, Body>>>); @@ -146,13 +147,24 @@ impl HttpApi { match self.next_id.0.checked_add(1) { Some(new_id) => self.next_id.0 = new_id, None => { - error!("Overflow in offchain worker HTTP request ID assignment"); + tracing::error!( + target: LOG_TARGET, + "Overflow in offchain worker HTTP request ID assignment" + ); return Err(()) }, }; self.requests .insert(new_id, HttpApiRequest::NotDispatched(request, body_sender)); + tracing::error!( + target: LOG_TARGET, + id = %new_id.0, + %method, + %uri, + "Requested started", + ); + Ok(new_id) } @@ -168,11 +180,14 @@ impl HttpApi { _ => return Err(()), }; - let name = hyper::header::HeaderName::try_from(name).map_err(drop)?; - let value = hyper::header::HeaderValue::try_from(value).map_err(drop)?; + let header_name = hyper::header::HeaderName::try_from(name).map_err(drop)?; + let header_value = hyper::header::HeaderValue::try_from(value).map_err(drop)?; // Note that we're always appending headers and never replacing old values. // We assume here that the user knows what they're doing. - request.headers_mut().append(name, value); + request.headers_mut().append(header_name, header_value); + + tracing::debug!(target: LOG_TARGET, id = %request_id.0, %name, %value, "Added header to request"); + Ok(()) } @@ -207,7 +222,7 @@ impl HttpApi { sender.send_data(hyper::body::Bytes::from(chunk.to_owned())), ) .map_err(|_| { - error!("HTTP sender refused data despite being ready"); + tracing::error!(target: "offchain-worker::http", "HTTP sender refused data despite being ready"); HttpError::IoError }) }; @@ -215,6 +230,7 @@ impl HttpApi { loop { request = match request { HttpApiRequest::NotDispatched(request, sender) => { + tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Added new body chunk"); // If the request is not dispatched yet, dispatch it and loop again. let _ = self .to_worker @@ -225,14 +241,20 @@ impl HttpApi { HttpApiRequest::Dispatched(Some(mut sender)) => { if !chunk.is_empty() { match poll_sender(&mut sender) { - Err(HttpError::IoError) => return Err(HttpError::IoError), + Err(HttpError::IoError) => { + tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body"); + return Err(HttpError::IoError) + }, other => { + tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body"); self.requests .insert(request_id, HttpApiRequest::Dispatched(Some(sender))); return other }, } } else { + tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Finished writing body"); + // Writing an empty body is a hint that we should stop writing. Dropping // the sender. self.requests.insert(request_id, HttpApiRequest::Dispatched(None)); @@ -250,14 +272,20 @@ impl HttpApi { .as_mut() .expect("Can only enter this match branch if Some; qed"), ) { - Err(HttpError::IoError) => return Err(HttpError::IoError), + Err(HttpError::IoError) => { + tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body"); + return Err(HttpError::IoError) + }, other => { + tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body"); self.requests .insert(request_id, HttpApiRequest::Response(response)); return other }, } } else { + tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Finished writing body"); + // Writing an empty body is a hint that we should stop writing. Dropping // the sender. self.requests.insert( @@ -271,13 +299,18 @@ impl HttpApi { } }, - HttpApiRequest::Fail(_) => - // If the request has already failed, return without putting back the request - // in the list. - return Err(HttpError::IoError), + HttpApiRequest::Fail(error) => { + tracing::debug!(target: LOG_TARGET, id = %request_id.0, ?error, "Request failed"); + + // If the request has already failed, return without putting back the request + // in the list. + return Err(HttpError::IoError) + }, v @ HttpApiRequest::Dispatched(None) | v @ HttpApiRequest::Response(HttpApiRequestRp { sending_body: None, .. }) => { + tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Body sending already finished"); + // We have already finished sending this body. self.requests.insert(request_id, v); return Err(HttpError::Invalid) @@ -350,8 +383,19 @@ impl HttpApi { // Requests in "fail" mode are purged before returning. debug_assert_eq!(output.len(), ids.len()); for n in (0..ids.len()).rev() { - if let HttpRequestStatus::IoError = output[n] { - self.requests.remove(&ids[n]); + match output[n] { + HttpRequestStatus::IoError => { + self.requests.remove(&ids[n]); + }, + HttpRequestStatus::Invalid => { + tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Unknown request"); + }, + HttpRequestStatus::DeadlineReached => { + tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Deadline reached"); + }, + HttpRequestStatus::Finished(_) => { + tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Request finished"); + }, } } return output @@ -388,20 +432,23 @@ impl HttpApi { ); }, None => {}, // can happen if we detected an IO error when sending the body - _ => error!("State mismatch between the API and worker"), + _ => + tracing::error!(target: "offchain-worker::http", "State mismatch between the API and worker"), } }, Some(WorkerToApi::Fail { id, error }) => match self.requests.remove(&id) { Some(HttpApiRequest::Dispatched(_)) => { + tracing::debug!(target: LOG_TARGET, id = %id.0, ?error, "Request failed"); self.requests.insert(id, HttpApiRequest::Fail(error)); }, None => {}, // can happen if we detected an IO error when sending the body - _ => error!("State mismatch between the API and worker"), + _ => + tracing::error!(target: "offchain-worker::http", "State mismatch between the API and worker"), }, None => { - error!("Worker has crashed"); + tracing::error!(target: "offchain-worker::http", "Worker has crashed"); return ids.iter().map(|_| HttpRequestStatus::IoError).collect() }, } @@ -474,7 +521,7 @@ impl HttpApi { }, Err(err) => { // This code should never be reached unless there's a logic error somewhere. - error!("Failed to read from current read chunk: {:?}", err); + tracing::error!(target: "offchain-worker::http", "Failed to read from current read chunk: {:?}", err); return Err(HttpError::IoError) }, } @@ -719,7 +766,10 @@ mod tests { // Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP // server that runs in the background as well. macro_rules! build_api_server { - () => {{ + () => { + build_api_server!(hyper::Response::new(hyper::Body::from("Hello World!"))) + }; + ( $response:expr ) => {{ let hyper_client = SHARED_CLIENT.clone(); let (api, worker) = http(hyper_client.clone()); @@ -736,9 +786,7 @@ mod tests { // otherwise the tests are flaky. let _ = req.into_body().collect::<Vec<_>>().await; - Ok::<_, Infallible>(hyper::Response::new(hyper::Body::from( - "Hello World!", - ))) + Ok::<_, Infallible>($response) }, )) }), @@ -776,6 +824,33 @@ mod tests { assert_eq!(&buf[..n], b"Hello World!"); } + #[test] + fn basic_http2_localhost() { + let deadline = timestamp::now().add(Duration::from_millis(10_000)); + + // Performs an HTTP query to a background HTTP server. + + let (mut api, addr) = build_api_server!(hyper::Response::builder() + .version(hyper::Version::HTTP_2) + .body(hyper::Body::from("Hello World!")) + .unwrap()); + + let id = api.request_start("POST", &format!("http://{}", addr)).unwrap(); + api.request_write_body(id, &[], Some(deadline)).unwrap(); + + match api.response_wait(&[id], Some(deadline))[0] { + HttpRequestStatus::Finished(200) => {}, + v => panic!("Connecting to localhost failed: {:?}", v), + } + + let headers = api.response_headers(id); + assert!(headers.iter().any(|(h, _)| h.eq_ignore_ascii_case(b"Date"))); + + let mut buf = vec![0; 2048]; + let n = api.response_read_body(id, &mut buf, Some(deadline)).unwrap(); + assert_eq!(&buf[..n], b"Hello World!"); + } + #[test] fn request_start_invalid_call() { let (mut api, addr) = build_api_server!(); diff --git a/substrate/client/offchain/src/lib.rs b/substrate/client/offchain/src/lib.rs index 2de24e10d927da2ce8a5cf6b0cca0af1f7837641..f9230a1552e1e50b068d308321a4a70f17053cf4 100644 --- a/substrate/client/offchain/src/lib.rs +++ b/substrate/client/offchain/src/lib.rs @@ -41,7 +41,6 @@ use futures::{ future::{ready, Future}, prelude::*, }; -use log::{debug, warn}; use parking_lot::Mutex; use sc_network::{ExHashT, NetworkService, NetworkStateInfo, PeerId}; use sp_api::{ApiExt, ProvideRuntimeApi}; @@ -57,6 +56,8 @@ mod api; pub use api::Db as OffchainDb; pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX}; +const LOG_TARGET: &str = "offchain-worker"; + /// NetworkProvider provides [`OffchainWorkers`] with all necessary hooks into the /// underlying Substrate networking. pub trait NetworkProvider: NetworkStateInfo { @@ -149,15 +150,25 @@ where err => { let help = "Consider turning off offchain workers if they are not part of your runtime."; - log::error!("Unsupported Offchain Worker API version: {:?}. {}.", err, help); + tracing::error!( + target: LOG_TARGET, + "Unsupported Offchain Worker API version: {:?}. {}.", + err, + help + ); 0 }, }; - debug!("Checking offchain workers at {:?}: version:{}", at, version); + tracing::debug!( + target: LOG_TARGET, + "Checking offchain workers at {:?}: version:{}", + at, + version + ); let process = (version > 0).then(|| { let (api, runner) = api::AsyncApi::new(network_provider, is_validator, self.shared_http_client.clone()); - debug!("Spawning offchain workers at {:?}", at); + tracing::debug!(target: LOG_TARGET, "Spawning offchain workers at {:?}", at); let header = header.clone(); let client = self.client.clone(); @@ -167,7 +178,7 @@ where self.spawn_worker(move || { let runtime = client.runtime_api(); let api = Box::new(api); - debug!("Running offchain workers at {:?}", at); + tracing::debug!(target: LOG_TARGET, "Running offchain workers at {:?}", at); let context = ExecutionContext::OffchainCall(Some((api, capabilities))); let run = if version == 2 { @@ -181,7 +192,12 @@ where ) }; if let Err(e) = run { - log::error!("Error running offchain workers at {:?}: {:?}", at, e); + tracing::error!( + target: LOG_TARGET, + "Error running offchain workers at {:?}: {:?}", + at, + e + ); } }); @@ -232,8 +248,8 @@ pub async fn notification_future<Client, Block, Spawner>( .boxed(), ); } else { - log::debug!( - target: "sc_offchain", + tracing::debug!( + target: LOG_TARGET, "Skipping offchain workers for non-canon block: {:?}", n.header, )