Unverified Commit 69b1058d authored by Sergey Pepyakin's avatar Sergey Pepyakin Committed by GitHub
Browse files

Mitigation of SIGBUS (#2440)

* Update shared-memory to new version & refactor

This two are combined in a single commit because the new version of
shared-memory doesn't provide the used functionality anymore.

Therefore in order to update the version of this crate we implement the
functionality that we need by ourselves, providing a cleaner API along
the way.

* Significantly decrease the required memory for a workspace

For some reason it was allocating an entire GiB of memory. I suspect
this has something to do with the current memory size limit of a PVF
execution environment (the prior name suggests that). However, we don't
need so much memory anywhere near that amount.

In fact, we could reduce the allocated size even more, but that maybe
for the next time.

* Unlink shmem just after opening

That will make sure that we don't leak the shmem accidentally.

* Do not compile workspace mod for androind and wasm

* Address some review comments

* Fix the test runner

* Fix missed +1 for the attached flag

* Use .expect rather than .unwrap

* Add a rustdoc for the workspace module

* fixup! Use .expect rather than .unwrap

* Add some doc comments to pub members

* Warn on error removing shm_unlink

* Change the alignment implementation

* Fix the comment nit
parent 5898cafc
Pipeline #124246 passed with stages
in 33 minutes and 30 seconds
This diff is collapsed.
...@@ -445,9 +445,9 @@ fn validate_candidate_exhaustive<B: ValidationBackend, S: SpawnNamed + 'static>( ...@@ -445,9 +445,9 @@ fn validate_candidate_exhaustive<B: ValidationBackend, S: SpawnNamed + 'static>(
match B::validate(backend_arg, &validation_code, params, spawn) { match B::validate(backend_arg, &validation_code, params, spawn) {
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Timeout)) => Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::Timeout)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)), Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::ParamsTooLarge(l))) => Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::ParamsTooLarge(l, _))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ParamsTooLarge(l as u64))), Ok(ValidationResult::Invalid(InvalidCandidate::ParamsTooLarge(l as u64))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::CodeTooLarge(l))) => Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::CodeTooLarge(l, _))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::CodeTooLarge(l as u64))), Ok(ValidationResult::Invalid(InvalidCandidate::CodeTooLarge(l as u64))),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::BadReturn)) => Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::BadReturn)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn)), Ok(ValidationResult::Invalid(InvalidCandidate::BadReturn)),
......
...@@ -27,9 +27,12 @@ sp-io = { git = "https://github.com/paritytech/substrate", branch = "master", op ...@@ -27,9 +27,12 @@ sp-io = { git = "https://github.com/paritytech/substrate", branch = "master", op
parking_lot = { version = "0.11.1", optional = true } parking_lot = { version = "0.11.1", optional = true }
log = { version = "0.4.11", optional = true } log = { version = "0.4.11", optional = true }
futures = { version = "0.3.8", optional = true } futures = { version = "0.3.8", optional = true }
static_assertions = { version = "1.1", optional = true }
libc = { version = "0.2.81", optional = true }
[target.'cfg(not(any(target_os = "android", target_os = "unknown")))'.dependencies] [target.'cfg(not(any(target_os = "android", target_os = "unknown")))'.dependencies]
shared_memory = { version = "0.10.0", optional = true } shared_memory = { version = "0.11.0", optional = true }
raw_sync = { version = "0.1", optional = true }
[features] [features]
default = ["std"] default = ["std"]
...@@ -42,9 +45,12 @@ std = [ ...@@ -42,9 +45,12 @@ std = [
"sp-std/std", "sp-std/std",
"sp-runtime/std", "sp-runtime/std",
"shared_memory", "shared_memory",
"raw_sync",
"sp-core/std", "sp-core/std",
"parking_lot", "parking_lot",
"static_assertions",
"log", "log",
"libc",
"parity-util-mem", "parity-util-mem",
"sp-externalities", "sp-externalities",
"sc-executor", "sc-executor",
......
...@@ -32,11 +32,6 @@ pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, WOR ...@@ -32,11 +32,6 @@ pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, WOR
mod validation_host; mod validation_host;
// maximum memory in bytes
const MAX_RUNTIME_MEM: usize = 1024 * 1024 * 1024; // 1 GiB
const MAX_CODE_MEM: usize = 16 * 1024 * 1024; // 16 MiB
const MAX_VALIDATION_RESULT_HEADER_MEM: usize = MAX_CODE_MEM + 1024; // 16.001 MiB
/// The strategy we employ for isolating execution of wasm parachain validation function (PVF). /// The strategy we employ for isolating execution of wasm parachain validation function (PVF).
/// ///
/// For a typical validator an external process is the default way to run PVF. The rationale is based /// For a typical validator an external process is the default way to run PVF. The rationale is based
...@@ -126,11 +121,11 @@ pub enum InvalidCandidate { ...@@ -126,11 +121,11 @@ pub enum InvalidCandidate {
#[error("WASM executor error")] #[error("WASM executor error")]
WasmExecutor(#[from] sc_executor::error::Error), WasmExecutor(#[from] sc_executor::error::Error),
/// Call data is too large. /// Call data is too large.
#[error("Validation parameters are {0} bytes, max allowed is {}", MAX_RUNTIME_MEM)] #[error("Validation parameters are {0} bytes, max allowed is {1}")]
ParamsTooLarge(usize), ParamsTooLarge(usize, usize),
/// Code size it too large. /// Code size it too large.
#[error("WASM code is {0} bytes, max allowed is {}", MAX_CODE_MEM)] #[error("WASM code is {0} bytes, max allowed is {1}")]
CodeTooLarge(usize), CodeTooLarge(usize, usize),
/// Error decoding returned data. /// Error decoding returned data.
#[error("Validation function returned invalid data.")] #[error("Validation function returned invalid data.")]
BadReturn, BadReturn,
...@@ -156,8 +151,20 @@ pub enum InternalError { ...@@ -156,8 +151,20 @@ pub enum InternalError {
System(#[from] Box<dyn std::error::Error + Send + Sync + 'static>), System(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
#[cfg(not(any(target_os = "android", target_os = "unknown")))] #[cfg(not(any(target_os = "android", target_os = "unknown")))]
#[error("Shared memory error: {0}")] #[error("Failed to create shared memory: {0}")]
SharedMem(#[from] shared_memory::SharedMemError), WorkerStartTimeout(String),
#[cfg(not(any(target_os = "android", target_os = "unknown")))]
#[error("Failed to create shared memory: {0}")]
FailedToCreateSharedMemory(String),
#[cfg(not(any(target_os = "android", target_os = "unknown")))]
#[error("Failed to send a singal to worker: {0}")]
FailedToSignal(String),
#[cfg(not(any(target_os = "android", target_os = "unknown")))]
#[error("Failed to send data to worker: {0}")]
FailedToWriteData(&'static str),
#[error("WASM worker error: {0}")] #[error("WASM worker error: {0}")]
WasmWorker(String), WasmWorker(String),
......
...@@ -16,14 +16,9 @@ ...@@ -16,14 +16,9 @@
#![cfg(not(any(target_os = "android", target_os = "unknown")))] #![cfg(not(any(target_os = "android", target_os = "unknown")))]
use std::{process, env, sync::Arc, sync::atomic, path::PathBuf}; use std::{env, path::PathBuf, process, sync::Arc, sync::atomic};
use parity_scale_codec::{Decode, Encode};
use crate::primitives::{ValidationParams, ValidationResult}; use crate::primitives::{ValidationParams, ValidationResult};
use super::{ use super::{validate_candidate_internal, ValidationError, InvalidCandidate, InternalError};
validate_candidate_internal, ValidationError, InvalidCandidate, InternalError,
MAX_CODE_MEM, MAX_RUNTIME_MEM, MAX_VALIDATION_RESULT_HEADER_MEM,
};
use shared_memory::{SharedMem, SharedMemConf, EventState, WriteLockable, EventWait, EventSet};
use parking_lot::Mutex; use parking_lot::Mutex;
use log::{debug, trace}; use log::{debug, trace};
use futures::executor::ThreadPool; use futures::executor::ThreadPool;
...@@ -35,18 +30,14 @@ pub const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; ...@@ -35,18 +30,14 @@ pub const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
const LOG_TARGET: &'static str = "validation-worker"; const LOG_TARGET: &'static str = "validation-worker";
mod workspace;
/// Execution timeout in seconds; /// Execution timeout in seconds;
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
pub const EXECUTION_TIMEOUT_SEC: u64 = 30; pub const EXECUTION_TIMEOUT_SEC: u64 = 30;
#[cfg(not(debug_assertions))] #[cfg(not(debug_assertions))]
pub const EXECUTION_TIMEOUT_SEC: u64 = 5; pub const EXECUTION_TIMEOUT_SEC: u64 = 5;
enum Event {
CandidateReady = 0,
ResultReady = 1,
WorkerReady = 2,
}
#[derive(Clone)] #[derive(Clone)]
struct TaskExecutor(ThreadPool); struct TaskExecutor(ThreadPool);
...@@ -99,16 +90,14 @@ impl ValidationPool { ...@@ -99,16 +90,14 @@ impl ValidationPool {
let worker_cli_args = match cache_base_path { let worker_cli_args = match cache_base_path {
Some(cache_base_path) => { Some(cache_base_path) => {
let worker_cli_args: Vec<&str> = let worker_cli_args: Vec<&str> = WORKER_ARGS
WORKER_ARGS.into_iter() .into_iter()
.cloned() .cloned()
.chain(iter::once(cache_base_path)) .chain(iter::once(cache_base_path))
.collect(); .collect();
Cow::from(worker_cli_args) Cow::from(worker_cli_args)
} }
None => { None => Cow::from(WORKER_ARGS),
Cow::from(WORKER_ARGS)
},
}; };
self.validate_candidate_custom( self.validate_candidate_custom(
...@@ -133,24 +122,31 @@ impl ValidationPool { ...@@ -133,24 +122,31 @@ impl ValidationPool {
) -> Result<ValidationResult, ValidationError> { ) -> Result<ValidationResult, ValidationError> {
for host in self.hosts.iter() { for host in self.hosts.iter() {
if let Some(mut host) = host.try_lock() { if let Some(mut host) = host.try_lock() {
return host.validate_candidate(validation_code, params, command, args) return host.validate_candidate(validation_code, params, command, args);
} }
} }
// all workers are busy, just wait for the first one // all workers are busy, just wait for the first one
self.hosts[0].lock().validate_candidate(validation_code, params, command, args) self.hosts[0]
.lock()
.validate_candidate(validation_code, params, command, args)
} }
} }
/// Validation worker process entry point. Runs a loop waiting for candidates to validate /// Validation worker process entry point. Runs a loop waiting for candidates to validate
/// and sends back results via shared memory. /// and sends back results via shared memory.
pub fn run_worker(mem_id: &str, cache_base_path: Option<PathBuf>) -> Result<(), String> { pub fn run_worker(mem_id: &str, cache_base_path: Option<PathBuf>) -> Result<(), String> {
let mut memory = match SharedMem::open(mem_id) { let mut worker_handle = match workspace::open(mem_id) {
Ok(memory) => memory,
Err(e) => { Err(e) => {
debug!(target: LOG_TARGET, "{} Error opening shared memory: {:?}", process::id(), e); debug!(
return Err(format!("Error opening shared memory: {:?}", e)); target: LOG_TARGET,
"{} Error opening shared memory: {:?}",
process::id(),
e
);
return Err(e);
} }
Ok(h) => h,
}; };
let exit = Arc::new(atomic::AtomicBool::new(false)); let exit = Arc::new(atomic::AtomicBool::new(false));
...@@ -162,12 +158,15 @@ pub fn run_worker(mem_id: &str, cache_base_path: Option<PathBuf>) -> Result<(), ...@@ -162,12 +158,15 @@ pub fn run_worker(mem_id: &str, cache_base_path: Option<PathBuf>) -> Result<(),
let mut in_data = Vec::new(); let mut in_data = Vec::new();
// pipe terminates when parent process exits // pipe terminates when parent process exits
std::io::stdin().read_to_end(&mut in_data).ok(); std::io::stdin().read_to_end(&mut in_data).ok();
debug!(target: LOG_TARGET, "{} Parent process is dead. Exiting", process::id()); debug!(
target: LOG_TARGET,
"{} Parent process is dead. Exiting",
process::id()
);
exit.store(true, atomic::Ordering::Relaxed); exit.store(true, atomic::Ordering::Relaxed);
}); });
memory.set(Event::WorkerReady as usize, EventState::Signaled) worker_handle.signal_ready()?;
.map_err(|e| format!("{} Error setting shared event: {:?}", process::id(), e))?;
let executor = super::ExecutorCache::new(cache_base_path); let executor = super::ExecutorCache::new(cache_base_path);
...@@ -176,102 +175,64 @@ pub fn run_worker(mem_id: &str, cache_base_path: Option<PathBuf>) -> Result<(), ...@@ -176,102 +175,64 @@ pub fn run_worker(mem_id: &str, cache_base_path: Option<PathBuf>) -> Result<(),
break; break;
} }
debug!(target: LOG_TARGET, "{} Waiting for candidate", process::id()); debug!(
match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(3)) { target: LOG_TARGET,
Err(e) => { "{} Waiting for candidate",
// Timeout process::id()
trace!(target: LOG_TARGET, "{} Timeout waiting for candidate: {:?}", process::id(), e); );
let work_item = match worker_handle.wait_for_work(3) {
Err(workspace::WaitForWorkErr::Wait(e)) => {
trace!(
target: LOG_TARGET,
"{} Timeout waiting for candidate: {:?}",
process::id(),
e
);
continue; continue;
} }
Ok(()) => {} Err(workspace::WaitForWorkErr::FailedToDecode(e)) => {
} return Err(e);
}
{ Ok(work_item) => work_item,
debug!(target: LOG_TARGET, "{} Processing candidate", process::id()); };
// we have candidate data
let mut slice = memory.wlock_as_slice(0)
.map_err(|e| format!("Error locking shared memory: {:?}", e))?;
let result = {
let data: &mut[u8] = &mut **slice;
let (header_buf, rest) = data.split_at_mut(1024);
let mut header_buf: &[u8] = header_buf;
let header = ValidationHeader::decode(&mut header_buf)
.map_err(|_| format!("Error decoding validation request."))?;
debug!(target: LOG_TARGET, "{} Candidate header: {:?}", process::id(), header);
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
let (code, _) = code.split_at_mut(header.code_size as usize);
let (call_data, _) = rest.split_at_mut(MAX_RUNTIME_MEM);
let (call_data, _) = call_data.split_at_mut(header.params_size as usize);
let result = validate_candidate_internal(&executor, code, call_data, task_executor.clone());
debug!(target: LOG_TARGET, "{} Candidate validated: {:?}", process::id(), result);
match result {
Ok(r) => ValidationResultHeader::Ok(r),
Err(ValidationError::Internal(e)) =>
ValidationResultHeader::Error(WorkerValidationError::InternalError(e.to_string())),
Err(ValidationError::InvalidCandidate(e)) =>
ValidationResultHeader::Error(WorkerValidationError::ValidationError(e.to_string())),
}
};
let mut data: &mut[u8] = &mut **slice;
result.encode_to(&mut data);
}
debug!(target: LOG_TARGET, "{} Signaling result", process::id());
memory.set(Event::ResultReady as usize, EventState::Signaled)
.map_err(|e| format!("Error setting shared event: {:?}", e))?;
}
Ok(())
}
/// Params header in shared memory. All offsets should be aligned to WASM page size.
#[derive(Encode, Decode, Debug)]
struct ValidationHeader {
code_size: u64,
params_size: u64,
}
#[derive(Encode, Decode, Debug)]
enum WorkerValidationError {
InternalError(String),
ValidationError(String),
}
#[derive(Encode, Decode, Debug)]
enum ValidationResultHeader {
Ok(ValidationResult),
Error(WorkerValidationError),
}
unsafe impl Send for ValidationHost {}
struct ValidationHostMemory(SharedMem);
impl std::fmt::Debug for ValidationHostMemory { debug!(target: LOG_TARGET, "{} Processing candidate", process::id());
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { let result = validate_candidate_internal(
write!(f, "ValidationHostMemory") &executor,
} work_item.code,
} work_item.params,
task_executor.clone(),
);
impl std::ops::Deref for ValidationHostMemory { debug!(
type Target = SharedMem; target: LOG_TARGET,
"{} Candidate validated: {:?}",
process::id(),
result
);
fn deref(&self) -> &Self::Target { let result_header = match result {
&self.0 Ok(r) => workspace::ValidationResultHeader::Ok(r),
Err(ValidationError::Internal(e)) => workspace::ValidationResultHeader::Error(
workspace::WorkerValidationError::InternalError(e.to_string()),
),
Err(ValidationError::InvalidCandidate(e)) => workspace::ValidationResultHeader::Error(
workspace::WorkerValidationError::ValidationError(e.to_string()),
),
};
worker_handle
.report_result(result_header)
.map_err(|e| format!("error reporting result: {:?}", e))?;
} }
Ok(())
} }
impl std::ops::DerefMut for ValidationHostMemory { unsafe impl Send for ValidationHost {}
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Default, Debug)] #[derive(Default, Debug)]
struct ValidationHost { struct ValidationHost {
worker: Option<process::Child>, worker: Option<process::Child>,
memory: Option<ValidationHostMemory>, host_handle: Option<workspace::HostHandle>,
id: u32, id: u32,
} }
...@@ -284,18 +245,6 @@ impl Drop for ValidationHost { ...@@ -284,18 +245,6 @@ impl Drop for ValidationHost {
} }
impl ValidationHost { impl ValidationHost {
fn create_memory() -> Result<SharedMem, InternalError> {
let mem_size = MAX_RUNTIME_MEM + MAX_CODE_MEM + MAX_VALIDATION_RESULT_HEADER_MEM;
let mem_config = SharedMemConf::default()
.set_size(mem_size)
.add_lock(shared_memory::LockType::Mutex, 0, mem_size)?
.add_event(shared_memory::EventType::Auto)? // Event::CandidateReady
.add_event(shared_memory::EventType::Auto)? // Event::ResultReady
.add_event(shared_memory::EventType::Auto)?; // Event::WorkerReady
Ok(mem_config.create()?)
}
fn start_worker(&mut self, cmd: &PathBuf, args: &[&str]) -> Result<(), InternalError> { fn start_worker(&mut self, cmd: &PathBuf, args: &[&str]) -> Result<(), InternalError> {
if let Some(ref mut worker) = self.worker { if let Some(ref mut worker) = self.worker {
// Check if still alive // Check if still alive
...@@ -305,28 +254,28 @@ impl ValidationHost { ...@@ -305,28 +254,28 @@ impl ValidationHost {
} }
} }
let memory = Self::create_memory()?; let host_handle =
workspace::create().map_err(|msg| InternalError::FailedToCreateSharedMemory(msg))?;
debug!( debug!(
target: LOG_TARGET, target: LOG_TARGET,
"Starting worker at {:?} with arguments: {:?} and {:?}", "Starting worker at {:?} with arguments: {:?} and {:?}",
cmd, cmd,
args, args,
memory.get_os_path(), host_handle.id(),
); );
let worker = process::Command::new(cmd) let worker = process::Command::new(cmd)
.args(args) .args(args)
.arg(memory.get_os_path()) .arg(host_handle.id())
.stdin(process::Stdio::piped()) .stdin(process::Stdio::piped())
.spawn()?; .spawn()?;
self.id = worker.id(); self.id = worker.id();
self.worker = Some(worker); self.worker = Some(worker);
memory.wait( host_handle
Event::WorkerReady as usize, .wait_until_ready(EXECUTION_TIMEOUT_SEC)
shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize), .map_err(|e| InternalError::WorkerStartTimeout(format!("{:?}", e)))?;
)?; self.host_handle = Some(host_handle);
self.memory = Some(ValidationHostMemory(memory));
Ok(()) Ok(())
} }
...@@ -340,76 +289,68 @@ impl ValidationHost { ...@@ -340,76 +289,68 @@ impl ValidationHost {
binary: &PathBuf, binary: &PathBuf,
args: &[&str], args: &[&str],
) -> Result<ValidationResult, ValidationError> { ) -> Result<ValidationResult, ValidationError> {
if validation_code.len() > MAX_CODE_MEM {
return Err(ValidationError::InvalidCandidate(InvalidCandidate::CodeTooLarge(validation_code.len())));
}
// First, check if need to spawn the child process // First, check if need to spawn the child process
self.start_worker(binary, args)?; self.start_worker(binary, args)?;
let memory = self.memory.as_mut()
.expect("memory is always `Some` after `start_worker` completes successfully");
{
// Put data in shared mem
let data: &mut[u8] = &mut **memory.wlock_as_slice(0)
.map_err(|e|ValidationError::Internal(e.into()))?;
let (mut header_buf, rest) = data.split_at_mut(1024);
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
let (code, _) = code.split_at_mut(validation_code.len());
let (call_data, _) = rest.split_at_mut(MAX_RUNTIME_MEM);
code[..validation_code.len()].copy_from_slice(validation_code);
let encoded_params = params.encode();
if encoded_params.len() >= MAX_RUNTIME_MEM {
return Err(ValidationError::InvalidCandidate(InvalidCandidate::ParamsTooLarge(MAX_RUNTIME_MEM)));
}
call_data[..encoded_params.len()].copy_from_slice(&encoded_params);
let header = ValidationHeader {
code_size: validation_code.len() as u64,
params_size: encoded_params.len() as u64,
};
header.encode_to(&mut header_buf); let host_handle = self
} .host_handle
.as_mut()
.expect("host_handle is always `Some` after `start_worker` completes successfully");
debug!(target: LOG_TARGET, "{} Signaling candidate", self.id); debug!(target: LOG_TARGET, "{} Signaling candidate", self.id);
memory.set(Event::CandidateReady as usize, EventState::Signaled) match host_handle.request_validation(validation_code, params) {
.map_err(|e| ValidationError::Internal(e.into()))?; Ok(()) => {}
Err(workspace::RequestValidationErr::CodeTooLarge { actual, max }) => {
return Err(ValidationError::InvalidCandidate(
InvalidCandidate::CodeTooLarge(actual, max),
));
}
Err(workspace::RequestValidationErr::ParamsTooLarge { actual, max }) => {
return Err(ValidationError::InvalidCandidate(
InvalidCandidate::ParamsTooLarge(actual, max),
));
}
Err(workspace::RequestValidationErr::Signal(msg)) => {
return Err(ValidationError::Internal(InternalError::FailedToSignal(msg)));
}
Err(workspace::RequestValidationErr::WriteData(msg)) => {
return Err(ValidationError::Internal(InternalError::FailedToWriteData(msg)));
}
}
debug!(target: LOG_TARGET, "{} Waiting for results", self.id); debug!(target: LOG_TARGET, "{} Waiting for results", self.id);
match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize)) { let result_header = match host_handle.wait_for_result(EXECUTION_TIMEOUT_SEC) {
Err(e) => { Ok(inner_result) => inner_result,
debug!(target: LOG_TARGET, "Worker timeout: {:?}", e); Err(assumed_timeout) => {
debug!(target: LOG_TARGET