...@@ -136,6 +136,9 @@ pub enum InternalValidationError { ...@@ -136,6 +136,9 @@ pub enum InternalValidationError {
/// Could not find or open compiled artifact file. /// Could not find or open compiled artifact file.
#[error("validation: could not find or open compiled artifact file: {0}")] #[error("validation: could not find or open compiled artifact file: {0}")]
CouldNotOpenFile(String), CouldNotOpenFile(String),
/// Could not create a pipe between the worker and a child process.
#[error("validation: could not create pipe: {0}")]
CouldNotCreatePipe(String),
/// Host could not clear the worker cache after a job. /// Host could not clear the worker cache after a job.
#[error("validation: host could not clear the worker cache ({path:?}) after a job: {err}")] #[error("validation: host could not clear the worker cache ({path:?}) after a job: {err}")]
CouldNotClearWorkerDir { CouldNotClearWorkerDir {
......
...@@ -30,35 +30,36 @@ pub struct Handshake { ...@@ -30,35 +30,36 @@ pub struct Handshake {
/// The response from the execution worker. /// The response from the execution worker.
#[derive(Debug, Encode, Decode)] #[derive(Debug, Encode, Decode)]
pub enum WorkerResponse { pub struct WorkerResponse {
/// The job completed successfully. /// The response from the execute job process.
Ok { pub job_response: JobResponse,
/// The result of parachain validation. /// The amount of CPU time taken by the job.
result_descriptor: ValidationResult, pub duration: Duration,
/// The amount of CPU time taken by the job. }
duration: Duration,
}, /// An error occurred in the worker process.
/// The candidate is invalid. #[derive(thiserror::Error, Debug, Clone, Encode, Decode)]
InvalidCandidate(String), pub enum WorkerError {
/// Instantiation of the WASM module instance failed during an execution.
/// Possibly related to local issues or dirty node update. May be retried with re-preparation.
RuntimeConstruction(String),
/// The job timed out. /// The job timed out.
#[error("The job timed out")]
JobTimedOut, JobTimedOut,
/// The job process has died. We must kill the worker just in case. /// The job process has died. We must kill the worker just in case.
/// ///
/// We cannot treat this as an internal error because malicious code may have killed the job. /// We cannot treat this as an internal error because malicious code may have killed the job.
/// We still retry it, because in the non-malicious case it is likely spurious. /// We still retry it, because in the non-malicious case it is likely spurious.
#[error("The job process (pid {job_pid}) has died: {err}")]
JobDied { err: String, job_pid: i32 }, JobDied { err: String, job_pid: i32 },
/// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic, /// An unexpected error occurred in the job process, e.g. failing to spawn a thread, panic,
/// etc. /// etc.
/// ///
/// Because malicious code can cause a job error, we must not treat it as an internal error. We /// Because malicious code can cause a job error, we must not treat it as an internal error. We
/// still retry it, because in the non-malicious case it is likely spurious. /// still retry it, because in the non-malicious case it is likely spurious.
JobError(String), #[error("An unexpected error occurred in the job process: {0}")]
JobError(#[from] JobError),
/// Some internal error occurred. /// Some internal error occurred.
InternalError(InternalValidationError), #[error("An internal error occurred: {0}")]
InternalError(#[from] InternalValidationError),
} }
/// The result of a job on the execution worker. /// The result of a job on the execution worker.
...@@ -101,7 +102,7 @@ impl JobResponse { ...@@ -101,7 +102,7 @@ impl JobResponse {
/// An unexpected error occurred in the execution job process. Because this comes from the job, /// An unexpected error occurred in the execution job process. Because this comes from the job,
/// which executes untrusted code, this error must likewise be treated as untrusted. That is, we /// which executes untrusted code, this error must likewise be treated as untrusted. That is, we
/// cannot raise an internal error based on this. /// cannot raise an internal error based on this.
#[derive(thiserror::Error, Debug, Encode, Decode)] #[derive(thiserror::Error, Clone, Debug, Encode, Decode)]
pub enum JobError { pub enum JobError {
#[error("The job timed out")] #[error("The job timed out")]
TimedOut, TimedOut,
...@@ -114,4 +115,7 @@ pub enum JobError { ...@@ -114,4 +115,7 @@ pub enum JobError {
CouldNotSpawnThread(String), CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")] #[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String), CpuTimeMonitorThread(String),
/// Since the job can return any exit status it wants, we have to treat this as untrusted.
#[error("Unexpected exit status: {0}")]
UnexpectedExitStatus(i32),
} }
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Contains functionality related to PVFs that is shared by the PVF host and the PVF workers. //! Contains functionality related to PVFs that is shared by the PVF host and the PVF workers.
#![deny(unused_crate_dependencies)]
pub mod error; pub mod error;
pub mod execute; pub mod execute;
......
...@@ -18,12 +18,7 @@ use crate::prepare::PrepareJobKind; ...@@ -18,12 +18,7 @@ use crate::prepare::PrepareJobKind;
use parity_scale_codec::{Decode, Encode}; use parity_scale_codec::{Decode, Encode};
use polkadot_parachain_primitives::primitives::ValidationCodeHash; use polkadot_parachain_primitives::primitives::ValidationCodeHash;
use polkadot_primitives::ExecutorParams; use polkadot_primitives::ExecutorParams;
use std::{ use std::{fmt, sync::Arc, time::Duration};
cmp::{Eq, PartialEq},
fmt,
sync::Arc,
time::Duration,
};
/// A struct that carries the exhaustive set of data to prepare an artifact out of plain /// A struct that carries the exhaustive set of data to prepare an artifact out of plain
/// Wasm binary /// Wasm binary
......
...@@ -18,10 +18,13 @@ ...@@ -18,10 +18,13 @@
pub mod security; pub mod security;
use crate::{framed_recv_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET}; use crate::{
framed_recv_blocking, framed_send_blocking, SecurityStatus, WorkerHandshake, LOG_TARGET,
};
use cpu_time::ProcessTime; use cpu_time::ProcessTime;
use futures::never::Never; use futures::never::Never;
use parity_scale_codec::Decode; use nix::{errno::Errno, sys::resource::Usage};
use parity_scale_codec::{Decode, Encode};
use std::{ use std::{
any::Any, any::Any,
fmt::{self}, fmt::{self},
...@@ -58,8 +61,6 @@ macro_rules! decl_worker_main { ...@@ -58,8 +61,6 @@ macro_rules! decl_worker_main {
$crate::sp_tracing::try_init_simple(); $crate::sp_tracing::try_init_simple();
let worker_pid = std::process::id();
let args = std::env::args().collect::<Vec<_>>(); let args = std::env::args().collect::<Vec<_>>();
if args.len() == 1 { if args.len() == 1 {
print_help($expected_command); print_help($expected_command);
...@@ -548,6 +549,81 @@ fn recv_worker_handshake(stream: &mut UnixStream) -> io::Result<WorkerHandshake> ...@@ -548,6 +549,81 @@ fn recv_worker_handshake(stream: &mut UnixStream) -> io::Result<WorkerHandshake>
Ok(worker_handshake) Ok(worker_handshake)
} }
/// Calculate the total CPU time from the given `usage` structure, returned from
/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
/// and system time.
///
/// # Arguments
///
/// - `rusage`: Contains resource usage information.
///
/// # Returns
///
/// Returns a `Duration` representing the total CPU time.
pub fn get_total_cpu_usage(rusage: Usage) -> Duration {
let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
(rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;
return Duration::from_micros(micros)
}
/// Get a job response.
pub fn recv_child_response<T>(
received_data: &mut io::BufReader<&[u8]>,
context: &'static str,
) -> io::Result<T>
where
T: Decode,
{
let response_bytes = framed_recv_blocking(received_data)?;
T::decode(&mut response_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("{} pvf recv_child_response: decode error: {}", context, e),
)
})
}
pub fn send_result<T, E>(
stream: &mut UnixStream,
result: Result<T, E>,
worker_info: &WorkerInfo,
) -> io::Result<()>
where
T: std::fmt::Debug,
E: std::fmt::Debug + std::fmt::Display,
Result<T, E>: Encode,
{
if let Err(ref err) = result {
gum::warn!(
target: LOG_TARGET,
?worker_info,
"worker: error occurred: {}",
err
);
}
gum::trace!(
target: LOG_TARGET,
?worker_info,
"worker: sending result to host: {:?}",
result
);
framed_send_blocking(stream, &result.encode()).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
?worker_info,
"worker: error occurred sending result to host: {}",
err
);
err
})
}
pub fn stringify_errno(context: &'static str, errno: Errno) -> String {
format!("{}: {}: {}", context, errno, io::Error::last_os_error())
}
/// Functionality related to threads spawned by the workers. /// Functionality related to threads spawned by the workers.
/// ///
/// The motivation for this module is to coordinate worker threads without using async Rust. /// The motivation for this module is to coordinate worker threads without using async Rust.
......
...@@ -16,6 +16,9 @@ ...@@ -16,6 +16,9 @@
//! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary. //! Contains the logic for executing PVFs. Used by the polkadot-execute-worker binary.
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
pub use polkadot_node_core_pvf_common::{ pub use polkadot_node_core_pvf_common::{
error::ExecuteError, executor_interface::execute_artifact, error::ExecuteError, executor_interface::execute_artifact,
}; };
...@@ -36,11 +39,12 @@ use nix::{ ...@@ -36,11 +39,12 @@ use nix::{
use parity_scale_codec::{Decode, Encode}; use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{ use polkadot_node_core_pvf_common::{
error::InternalValidationError, error::InternalValidationError,
execute::{Handshake, JobError, JobResponse, JobResult, WorkerResponse}, execute::{Handshake, JobError, JobResponse, JobResult, WorkerError, WorkerResponse},
executor_interface::params_to_wasmtime_semantics, executor_interface::params_to_wasmtime_semantics,
framed_recv_blocking, framed_send_blocking, framed_recv_blocking, framed_send_blocking,
worker::{ worker::{
cpu_time_monitor_loop, pipe2_cloexec, run_worker, stringify_panic_payload, cpu_time_monitor_loop, get_total_cpu_usage, pipe2_cloexec, recv_child_response, run_worker,
send_result, stringify_errno, stringify_panic_payload,
thread::{self, WaitOutcome}, thread::{self, WaitOutcome},
PipeFd, WorkerInfo, WorkerKind, PipeFd, WorkerInfo, WorkerKind,
}, },
...@@ -93,8 +97,14 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, Duration)> { ...@@ -93,8 +97,14 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, Duration)> {
Ok((params, execution_timeout)) Ok((params, execution_timeout))
} }
fn send_response(stream: &mut UnixStream, response: WorkerResponse) -> io::Result<()> { /// Sends an error to the host and returns the original error wrapped in `io::Error`.
framed_send_blocking(stream, &response.encode()) macro_rules! map_and_send_err {
($error:expr, $err_constructor:expr, $stream:expr, $worker_info:expr) => {{
let err: WorkerError = $err_constructor($error.to_string()).into();
let io_err = io::Error::new(io::ErrorKind::Other, err.to_string());
let _ = send_result::<WorkerResponse, WorkerError>($stream, Err(err), $worker_info);
io_err
}};
} }
/// The entrypoint that the spawned execute worker should start with. /// The entrypoint that the spawned execute worker should start with.
...@@ -110,8 +120,6 @@ fn send_response(stream: &mut UnixStream, response: WorkerResponse) -> io::Resul ...@@ -110,8 +120,6 @@ fn send_response(stream: &mut UnixStream, response: WorkerResponse) -> io::Resul
/// check is not necessary. /// check is not necessary.
/// ///
/// - `worker_version`: see above /// - `worker_version`: see above
///
/// - `security_status`: contains the detected status of security features.
pub fn worker_entrypoint( pub fn worker_entrypoint(
socket_path: PathBuf, socket_path: PathBuf,
worker_dir_path: PathBuf, worker_dir_path: PathBuf,
...@@ -127,13 +135,28 @@ pub fn worker_entrypoint( ...@@ -127,13 +135,28 @@ pub fn worker_entrypoint(
|mut stream, worker_info, security_status| { |mut stream, worker_info, security_status| {
let artifact_path = worker_dir::execute_artifact(&worker_info.worker_dir_path); let artifact_path = worker_dir::execute_artifact(&worker_info.worker_dir_path);
let Handshake { executor_params } = recv_execute_handshake(&mut stream)?; let Handshake { executor_params } =
recv_execute_handshake(&mut stream).map_err(|e| {
map_and_send_err!(
e,
InternalValidationError::HostCommunication,
&mut stream,
worker_info
)
})?;
let executor_params: Arc<ExecutorParams> = Arc::new(executor_params); let executor_params: Arc<ExecutorParams> = Arc::new(executor_params);
let execute_thread_stack_size = max_stack_size(&executor_params); let execute_thread_stack_size = max_stack_size(&executor_params);
loop { loop {
let (params, execution_timeout) = recv_request(&mut stream)?; let (params, execution_timeout) = recv_request(&mut stream).map_err(|e| {
map_and_send_err!(
e,
InternalValidationError::HostCommunication,
&mut stream,
worker_info
)
})?;
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
?worker_info, ?worker_info,
...@@ -143,27 +166,34 @@ pub fn worker_entrypoint( ...@@ -143,27 +166,34 @@ pub fn worker_entrypoint(
); );
// Get the artifact bytes. // Get the artifact bytes.
let compiled_artifact_blob = match std::fs::read(&artifact_path) { let compiled_artifact_blob = std::fs::read(&artifact_path).map_err(|e| {
Ok(bytes) => bytes, map_and_send_err!(
Err(err) => { e,
let response = WorkerResponse::InternalError( InternalValidationError::CouldNotOpenFile,
InternalValidationError::CouldNotOpenFile(err.to_string()), &mut stream,
); worker_info
send_response(&mut stream, response)?; )
continue })?;
},
}; let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec().map_err(|e| {
map_and_send_err!(
let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec()?; e,
InternalValidationError::CouldNotCreatePipe,
let usage_before = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { &mut stream,
Ok(usage) => usage, worker_info
Err(errno) => { )
let response = internal_error_from_errno("getrusage before", errno); })?;
send_response(&mut stream, response)?;
continue let usage_before = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
}, .map_err(|errno| {
}; let e = stringify_errno("getrusage before", errno);
map_and_send_err!(
e,
InternalValidationError::Kernel,
&mut stream,
worker_info
)
})?;
let stream_fd = stream.as_raw_fd(); let stream_fd = stream.as_raw_fd();
let compiled_artifact_blob = Arc::new(compiled_artifact_blob); let compiled_artifact_blob = Arc::new(compiled_artifact_blob);
...@@ -222,7 +252,7 @@ pub fn worker_entrypoint( ...@@ -222,7 +252,7 @@ pub fn worker_entrypoint(
"worker: sending result to host: {:?}", "worker: sending result to host: {:?}",
result result
); );
send_response(&mut stream, result)?; send_result(&mut stream, result, worker_info)?;
} }
}, },
); );
...@@ -270,7 +300,7 @@ fn handle_clone( ...@@ -270,7 +300,7 @@ fn handle_clone(
worker_info: &WorkerInfo, worker_info: &WorkerInfo,
have_unshare_newuser: bool, have_unshare_newuser: bool,
usage_before: Usage, usage_before: Usage,
) -> io::Result<WorkerResponse> { ) -> io::Result<Result<WorkerResponse, WorkerError>> {
use polkadot_node_core_pvf_common::worker::security; use polkadot_node_core_pvf_common::worker::security;
// SAFETY: new process is spawned within a single threaded process. This invariant // SAFETY: new process is spawned within a single threaded process. This invariant
...@@ -301,7 +331,8 @@ fn handle_clone( ...@@ -301,7 +331,8 @@ fn handle_clone(
usage_before, usage_before,
execution_timeout, execution_timeout,
), ),
Err(security::clone::Error::Clone(errno)) => Ok(internal_error_from_errno("clone", errno)), Err(security::clone::Error::Clone(errno)) =>
Ok(Err(internal_error_from_errno("clone", errno))),
} }
} }
...@@ -316,7 +347,7 @@ fn handle_fork( ...@@ -316,7 +347,7 @@ fn handle_fork(
execute_worker_stack_size: usize, execute_worker_stack_size: usize,
worker_info: &WorkerInfo, worker_info: &WorkerInfo,
usage_before: Usage, usage_before: Usage,
) -> io::Result<WorkerResponse> { ) -> io::Result<Result<WorkerResponse, WorkerError>> {
// SAFETY: new process is spawned within a single threaded process. This invariant // SAFETY: new process is spawned within a single threaded process. This invariant
// is enforced by tests. // is enforced by tests.
match unsafe { nix::unistd::fork() } { match unsafe { nix::unistd::fork() } {
...@@ -338,7 +369,7 @@ fn handle_fork( ...@@ -338,7 +369,7 @@ fn handle_fork(
usage_before, usage_before,
execution_timeout, execution_timeout,
), ),
Err(errno) => Ok(internal_error_from_errno("fork", errno)), Err(errno) => Ok(Err(internal_error_from_errno("fork", errno))),
} }
} }
...@@ -483,11 +514,11 @@ fn handle_parent_process( ...@@ -483,11 +514,11 @@ fn handle_parent_process(
job_pid: Pid, job_pid: Pid,
usage_before: Usage, usage_before: Usage,
timeout: Duration, timeout: Duration,
) -> io::Result<WorkerResponse> { ) -> io::Result<Result<WorkerResponse, WorkerError>> {
// the read end will wait until all write ends have been closed, // the read end will wait until all write ends have been closed,
// this drop is necessary to avoid deadlock // this drop is necessary to avoid deadlock
if let Err(errno) = nix::unistd::close(pipe_write_fd) { if let Err(errno) = nix::unistd::close(pipe_write_fd) {
return Ok(internal_error_from_errno("closing pipe write fd", errno)); return Ok(Err(internal_error_from_errno("closing pipe write fd", errno)));
}; };
// SAFETY: pipe_read_fd is an open and owned file descriptor at this point. // SAFETY: pipe_read_fd is an open and owned file descriptor at this point.
...@@ -512,7 +543,7 @@ fn handle_parent_process( ...@@ -512,7 +543,7 @@ fn handle_parent_process(
let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage, Ok(usage) => usage,
Err(errno) => return Ok(internal_error_from_errno("getrusage after", errno)), Err(errno) => return Ok(Err(internal_error_from_errno("getrusage after", errno))),
}; };
// Using `getrusage` is needed to check whether child has timedout since we cannot rely on // Using `getrusage` is needed to check whether child has timedout since we cannot rely on
...@@ -530,32 +561,25 @@ fn handle_parent_process( ...@@ -530,32 +561,25 @@ fn handle_parent_process(
cpu_tv.as_millis(), cpu_tv.as_millis(),
timeout.as_millis(), timeout.as_millis(),
); );
return Ok(WorkerResponse::JobTimedOut) return Ok(Err(WorkerError::JobTimedOut))
} }
match status { match status {
Ok(WaitStatus::Exited(_, exit_status)) => { Ok(WaitStatus::Exited(_, exit_status)) => {
let mut reader = io::BufReader::new(received_data.as_slice()); let mut reader = io::BufReader::new(received_data.as_slice());
let result = match recv_child_response(&mut reader) { let result = recv_child_response(&mut reader, "execute")?;
Ok(result) => result,
Err(err) => return Ok(WorkerResponse::JobError(err.to_string())),
};
match result { match result {
Ok(JobResponse::Ok { result_descriptor }) => { Ok(job_response) => {
// The exit status should have been zero if no error occurred. // The exit status should have been zero if no error occurred.
if exit_status != 0 { if exit_status != 0 {
return Ok(WorkerResponse::JobError(format!( return Ok(Err(WorkerError::JobError(JobError::UnexpectedExitStatus(
"unexpected exit status: {}", exit_status,
exit_status ))));
)))
} }
Ok(WorkerResponse::Ok { result_descriptor, duration: cpu_tv }) Ok(Ok(WorkerResponse { job_response, duration: cpu_tv }))
}, },
Ok(JobResponse::InvalidCandidate(err)) => Ok(WorkerResponse::InvalidCandidate(err)),
Ok(JobResponse::RuntimeConstruction(err)) =>
Ok(WorkerResponse::RuntimeConstruction(err)),
Err(job_error) => { Err(job_error) => {
gum::warn!( gum::warn!(
target: LOG_TARGET, target: LOG_TARGET,
...@@ -565,9 +589,9 @@ fn handle_parent_process( ...@@ -565,9 +589,9 @@ fn handle_parent_process(
job_error, job_error,
); );
if matches!(job_error, JobError::TimedOut) { if matches!(job_error, JobError::TimedOut) {
Ok(WorkerResponse::JobTimedOut) Ok(Err(WorkerError::JobTimedOut))
} else { } else {
Ok(WorkerResponse::JobError(job_error.to_string())) Ok(Err(WorkerError::JobError(job_error.into())))
} }
}, },
} }
...@@ -576,50 +600,21 @@ fn handle_parent_process( ...@@ -576,50 +600,21 @@ fn handle_parent_process(
// //
// The job gets SIGSYS on seccomp violations, but this signal may have been sent for some // The job gets SIGSYS on seccomp violations, but this signal may have been sent for some
// other reason, so we still need to check for seccomp violations elsewhere. // other reason, so we still need to check for seccomp violations elsewhere.
Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Ok(WorkerResponse::JobDied { Ok(WaitStatus::Signaled(_pid, signal, _core_dump)) => Ok(Err(WorkerError::JobDied {
err: format!("received signal: {signal:?}"), err: format!("received signal: {signal:?}"),
job_pid: job_pid.as_raw(), job_pid: job_pid.as_raw(),
}), })),
Err(errno) => Ok(internal_error_from_errno("waitpid", errno)), Err(errno) => Ok(Err(internal_error_from_errno("waitpid", errno))),
// It is within an attacker's power to send an unexpected exit status. So we cannot treat // It is within an attacker's power to send an unexpected exit status. So we cannot treat
// this as an internal error (which would make us abstain), but must vote against. // this as an internal error (which would make us abstain), but must vote against.
Ok(unexpected_wait_status) => Ok(WorkerResponse::JobDied { Ok(unexpected_wait_status) => Ok(Err(WorkerError::JobDied {
err: format!("unexpected status from wait: {unexpected_wait_status:?}"), err: format!("unexpected status from wait: {unexpected_wait_status:?}"),
job_pid: job_pid.as_raw(), job_pid: job_pid.as_raw(),
}), })),
} }
} }
/// Calculate the total CPU time from the given `usage` structure, returned from
/// [`nix::sys::resource::getrusage`], and calculates the total CPU time spent, including both user
/// and system time.
///
/// # Arguments
///
/// - `rusage`: Contains resource usage information.
///
/// # Returns
///
/// Returns a `Duration` representing the total CPU time.
fn get_total_cpu_usage(rusage: Usage) -> Duration {
let micros = (((rusage.user_time().tv_sec() + rusage.system_time().tv_sec()) * 1_000_000) +
(rusage.system_time().tv_usec() + rusage.user_time().tv_usec()) as i64) as u64;
return Duration::from_micros(micros)
}
/// Get a job response.
fn recv_child_response(received_data: &mut io::BufReader<&[u8]>) -> io::Result<JobResult> {
let response_bytes = framed_recv_blocking(received_data)?;
JobResult::decode(&mut response_bytes.as_slice()).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("execute pvf recv_child_response: decode error: {}", e),
)
})
}
/// Write a job response to the pipe and exit process after. /// Write a job response to the pipe and exit process after.
/// ///
/// # Arguments /// # Arguments
...@@ -638,15 +633,10 @@ fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! { ...@@ -638,15 +633,10 @@ fn send_child_response(pipe_write: &mut PipeFd, response: JobResult) -> ! {
} }
} }
fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerResponse { fn internal_error_from_errno(context: &'static str, errno: Errno) -> WorkerError {
WorkerResponse::InternalError(InternalValidationError::Kernel(format!( WorkerError::InternalError(InternalValidationError::Kernel(stringify_errno(context, errno)))
"{}: {}: {}",
context,
errno,
io::Error::last_os_error()
)))
} }
fn job_error_from_errno(context: &'static str, errno: Errno) -> JobResult { fn job_error_from_errno(context: &'static str, errno: Errno) -> JobResult {
Err(JobError::Kernel(format!("{}: {}: {}", context, errno, io::Error::last_os_error()))) Err(JobError::Kernel(stringify_errno(context, errno)))
} }
...@@ -197,7 +197,7 @@ impl Config { ...@@ -197,7 +197,7 @@ impl Config {
prepare_worker_program_path, prepare_worker_program_path,
prepare_worker_spawn_timeout: Duration::from_secs(3), prepare_worker_spawn_timeout: Duration::from_secs(3),
prepare_workers_soft_max_num: 1, prepare_workers_soft_max_num: 1,
prepare_workers_hard_max_num: 1, prepare_workers_hard_max_num: 2,
execute_worker_program_path, execute_worker_program_path,
execute_worker_spawn_timeout: Duration::from_secs(3), execute_worker_spawn_timeout: Duration::from_secs(3),
...@@ -959,10 +959,7 @@ pub(crate) mod tests { ...@@ -959,10 +959,7 @@ pub(crate) mod tests {
use crate::{artifacts::generate_artifact_path, PossiblyInvalidError}; use crate::{artifacts::generate_artifact_path, PossiblyInvalidError};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use polkadot_node_core_pvf_common::{ use polkadot_node_core_pvf_common::prepare::PrepareStats;
error::PrepareError,
prepare::{PrepareStats, PrepareSuccess},
};
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30); pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30);
......
This diff is collapsed.
...@@ -148,6 +148,7 @@ enum ApprovalEntryError { ...@@ -148,6 +148,7 @@ enum ApprovalEntryError {
InvalidCandidateIndex, InvalidCandidateIndex,
DuplicateApproval, DuplicateApproval,
UnknownAssignment, UnknownAssignment,
#[allow(dead_code)]
AssignmentsFollowedDifferentPaths(RequiredRouting, RequiredRouting), AssignmentsFollowedDifferentPaths(RequiredRouting, RequiredRouting),
} }
......
...@@ -953,6 +953,7 @@ enum AdvertisementError { ...@@ -953,6 +953,7 @@ enum AdvertisementError {
/// parent. /// parent.
ProtocolMisuse, ProtocolMisuse,
/// Advertisement is invalid. /// Advertisement is invalid.
#[allow(dead_code)]
Invalid(InsertAdvertisementError), Invalid(InsertAdvertisementError),
} }
......