Newer
Older
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Contains the logic for preparing PVFs. Used by the polkadot-prepare-worker binary.
mod executor_intf;
mod memory_stats;
pub use executor_intf::{prepare, prevalidate};
// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are
// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-prepare-worker=trace`.
const LOG_TARGET: &str = "parachain::pvf-prepare-worker";
#[cfg(target_os = "linux")]
use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
executor_intf::create_runtime_from_artifact_bytes,
framed_recv_blocking, framed_send_blocking,
prepare::{MemoryStats, PrepareJobKind, PrepareStats},
pvf::PvfPrepData,
worker::{
cpu_time_monitor_loop, stringify_panic_payload,
thread::{self, WaitOutcome},
worker_dir, ProcessTime, SecurityStatus,
use polkadot_primitives::ExecutorParams;
path::PathBuf,
sync::{mpsc::channel, Arc},
time::Duration,
};
/// Contains the bytes for a successfully compiled artifact.
pub struct CompiledArtifact(Vec<u8>);
impl CompiledArtifact {
/// Creates a `CompiledArtifact`.
pub fn new(code: Vec<u8>) -> Self {
Self(code)
}
}
impl AsRef<[u8]> for CompiledArtifact {
fn as_ref(&self) -> &[u8] {
self.0.as_slice()
}
}
fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
let pvf = framed_recv_blocking(stream)?;
let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("prepare pvf recv_request: failed to decode PvfPrepData: {}", e),
)
})?;
fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
framed_send_blocking(stream, &result.encode())
/// The entrypoint that the spawned prepare worker should start with.
///
/// # Parameters
///
/// - `socket_path`: specifies the path to the socket used to communicate with the host.
///
/// - `worker_dir_path`: specifies the path to the worker-specific temporary directory.
///
/// - `node_version`: if `Some`, is checked against the `worker_version`. A mismatch results in
/// immediate worker termination. `None` is used for tests and in other situations when version
/// check is not necessary.
///
/// - `worker_version`: see above
///
/// - `security_status`: contains the detected status of security features.
///
/// # Flow
///
/// This runs the following in a loop:
///
/// 1. Get the code and parameters for preparation from the host.
///
/// 2. Start a memory tracker in a separate thread.
///
/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads.
///
/// 4. Wait on the two threads created in step 3.
///
/// 5. Stop the memory tracker and get the stats.
///
/// 6. If compilation succeeded, write the compiled artifact into a temporary file.
///
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// send that in the `PrepareResult`.
pub fn worker_entrypoint(
socket_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
) {
worker_event_loop(
socket_path,
node_version,
worker_version,
|mut stream, worker_dir_path| {
let worker_pid = std::process::id();
let temp_artifact_dest = worker_dir::prepare_tmp_artifact(&worker_dir_path);
loop {
let pvf = recv_request(&mut stream)?;
gum::debug!(
target: LOG_TARGET,
%worker_pid,
"worker: preparing artifact",
);
let preparation_timeout = pvf.prep_timeout();
let prepare_job_kind = pvf.prep_kind();
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Conditional variable to notify us when a thread is done.
let condvar = thread::get_condvar();
// Run the memory tracker in a regular, non-worker thread.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let condvar_memory = Arc::clone(&condvar);
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory));
let cpu_time_start = ProcessTime::now();
// Spawn a new thread that runs the CPU time monitor.
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
let cpu_time_monitor_thread = thread::spawn_worker_thread(
"cpu time monitor thread",
move || {
cpu_time_monitor_loop(
cpu_time_start,
preparation_timeout,
cpu_time_monitor_rx,
)
},
Arc::clone(&condvar),
WaitOutcome::TimedOut,
)?;
// Spawn another thread for preparation.
let prepare_thread = thread::spawn_worker_thread(
"prepare thread",
move || {
#[allow(unused_mut)]
let mut result = prepare_artifact(pvf, cpu_time_start);
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
#[cfg(target_os = "linux")]
let mut result = result
.map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread()));
// If we are pre-checking, check for runtime construction errors.
//
// As pre-checking is more strict than just preparation in terms of memory
// and time, it is okay to do extra checks here. This takes negligible time
// anyway.
if let PrepareJobKind::Prechecking = prepare_job_kind {
result = result.and_then(|output| {
runtime_construction_check(
output.0.as_ref(),
executor_params.as_ref(),
)?;
Ok(output)
});
}
},
Arc::clone(&condvar),
WaitOutcome::Finished,
)?;
let outcome = thread::wait_for_threads(condvar);
let result = match outcome {
WaitOutcome::Finished => {
let _ = cpu_time_monitor_tx.send(());
match prepare_thread.join().unwrap_or_else(|err| {
Err(PrepareError::Panic(stringify_panic_payload(err)))
// Serialized error will be written into the socket.
Err(err)
},
Ok(ok) => {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let (artifact, cpu_time_elapsed, max_rss) = ok;
} else {
let (artifact, cpu_time_elapsed) = ok;
}
}
// Stop the memory stats worker and get its observed memory stats.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid);
let memory_stats = MemoryStats {
#[cfg(any(
target_os = "linux",
feature = "jemalloc-allocator"
))]
memory_tracker_stats,
#[cfg(target_os = "linux")]
max_rss: extract_max_rss_stat(max_rss, worker_pid),
};
// Write the serialized artifact into a temp file.
//
// PVF host only keeps artifacts statuses in its memory,
// successfully compiled code gets stored on the disk (and
// consequently deserialized by execute-workers). The prepare worker
// is only required to send `Ok` to the pool to indicate the
// success.
gum::debug!(
target: LOG_TARGET,
%worker_pid,
"worker: writing artifact to {}",
temp_artifact_dest.display(),
);
fs::write(&temp_artifact_dest, &artifact)?;
Ok(PrepareStats { cpu_time_elapsed, memory_stats })
},
}
},
// If the CPU thread is not selected, we signal it to end, the join handle is
// dropped and the thread will finish in the background.
WaitOutcome::TimedOut => {
match cpu_time_monitor_thread.join() {
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't
// finished.
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_time_elapsed.as_millis(),
preparation_timeout.as_millis(),
Err(PrepareError::TimedOut)
},
Ok(None) => Err(PrepareError::IoErr(
"error communicating over closed channel".into(),
)),
// Errors in this thread are independent of the PVF.
Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))),
}
},
WaitOutcome::Pending => unreachable!(
"we run wait_while until the outcome is no longer pending; qed"
),
};
gum::trace!(
target: LOG_TARGET,
%worker_pid,
"worker: sending response to host: {:?}",
result
);
send_response(&mut stream, result)?;
}
},
);
fn prepare_artifact(
pvf: PvfPrepData,
cpu_time_start: ProcessTime,
) -> Result<(CompiledArtifact, Duration), PrepareError> {
let blob = match prevalidate(&pvf.code()) {
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
Ok(b) => b,
};
match prepare(blob, &pvf.executor_params()) {
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
.map(|artifact| (artifact, cpu_time_start.elapsed()))
/// Try constructing the runtime to catch any instantiation errors during pre-checking.
fn runtime_construction_check(
artifact_bytes: &[u8],
) -> Result<(), PrepareError> {
// SAFETY: We just compiled this artifact.
let result = unsafe { create_runtime_from_artifact_bytes(artifact_bytes, executor_params) };