Skip to content
Snippets Groups Projects
Unverified Commit 8bf5a1c0 authored by Marcin S.'s avatar Marcin S. Committed by GitHub
Browse files

PVF: ensure job processes are cleaned up, add tests (#2643)

Fixes a potential memory leak.

`PR_SET_PDEATHSIG` is used to terminate children when the parent dies.
Note that this is subject to a race. There seems to be a raceless
alternative [here](https://stackoverflow.com/a/42498370/6085242), but
the concern is small enough that a bit more complexity doesn't seem
worth it. Left a bit more info in the code comment.
parent 45f4d9a2
Branches
No related merge requests found
Pipeline #430203 failed with stages
in 1 hour, 24 minutes, and 3 seconds
......@@ -96,4 +96,6 @@ pub enum JobError {
CouldNotSpawnThread(String),
#[error("An error occurred in the CPU time monitor thread: {0}")]
CpuTimeMonitorThread(String),
#[error("Could not set pdeathsig: {0}")]
CouldNotSetPdeathsig(String),
}
......@@ -277,6 +277,15 @@ fn handle_child_process(
params: Vec<u8>,
execution_timeout: Duration,
) -> ! {
// Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded).
//
// RACE: the worker may die before we install the death signal. In practice this is unlikely,
// and most of the time the job process should terminate on its own when it completes.
#[cfg(target_os = "linux")]
nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(JobError::CouldNotSetPdeathsig(err.to_string())))
});
gum::debug!(
target: LOG_TARGET,
worker_job_pid = %process::id(),
......
......@@ -334,6 +334,15 @@ fn handle_child_process(
prepare_job_kind: PrepareJobKind,
executor_params: Arc<ExecutorParams>,
) -> ! {
// Terminate if the parent thread dies. Parent thread == worker process (it is single-threaded).
//
// RACE: the worker may die before we install the death signal. In practice this is unlikely,
// and most of the time the job process should terminate on its own when it completes.
#[cfg(target_os = "linux")]
nix::sys::prctl::set_pdeathsig(nix::sys::signal::Signal::SIGTERM).unwrap_or_else(|err| {
send_child_response(&mut pipe_write, Err(PrepareError::IoErr(err.to_string())))
});
let worker_job_pid = process::id();
gum::debug!(
target: LOG_TARGET,
......
......@@ -105,9 +105,9 @@ pub async fn spawn_with_program_path(
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path_clone,
?extra_args_clone,
?worker_dir_clone,
program_path = ?program_path_clone,
extra_args = ?extra_args_clone,
worker_dir = ?worker_dir_clone,
"error spawning worker: {}",
err,
);
......
......@@ -18,14 +18,18 @@
//! spawned by the host) and job processes (spawned by the workers to securely perform PVF jobs).
use super::TestHost;
use adder::{hash_state, BlockData, HeadData};
use assert_matches::assert_matches;
use parity_scale_codec::Encode;
use polkadot_node_core_pvf::{
InvalidCandidate, PossiblyInvalidError, PrepareError, ValidationError,
};
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams};
use polkadot_parachain_primitives::primitives::{
BlockData as GenericBlockData, HeadData as GenericHeadData, ValidationParams,
};
use procfs::process;
use rusty_fork::rusty_fork_test;
use std::time::Duration;
use std::{future::Future, sync::Arc, time::Duration};
const PREPARE_PROCESS_NAME: &'static str = "polkadot-prepare-worker";
const EXECUTE_PROCESS_NAME: &'static str = "polkadot-execute-worker";
......@@ -39,11 +43,13 @@ fn send_signal_by_sid_and_name(
is_direct_child: bool,
signal: i32,
) {
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child);
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child)
.expect("Should have found the expected process");
assert_eq!(unsafe { libc::kill(process.pid(), signal) }, 0);
}
fn get_num_threads_by_sid_and_name(sid: i32, exe_name: &'static str, is_direct_child: bool) -> i64 {
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child);
let process = find_process_by_sid_and_name(sid, exe_name, is_direct_child)
.expect("Should have found the expected process");
process.stat().unwrap().num_threads
}
......@@ -51,7 +57,7 @@ fn find_process_by_sid_and_name(
sid: i32,
exe_name: &'static str,
is_direct_child: bool,
) -> process::Process {
) -> Option<process::Process> {
let all_processes: Vec<process::Process> = process::all_processes()
.expect("Can't read /proc")
.filter_map(|p| match p {
......@@ -68,7 +74,7 @@ fn find_process_by_sid_and_name(
let mut found = None;
for process in all_processes {
let stat = process.stat().unwrap();
let stat = process.stat().expect("/proc existed above. Potential race occurred");
if stat.session != sid || !process.exe().unwrap().to_str().unwrap().contains(exe_name) {
continue
......@@ -85,24 +91,68 @@ fn find_process_by_sid_and_name(
}
found = Some(process);
}
found.expect("Should have found the expected process")
found
}
/// Sets up the test and makes sure everything gets cleaned up after.
///
/// We run the runtime manually because `#[tokio::test]` doesn't work in `rusty_fork_test!`.
fn test_wrapper<F, Fut>(f: F)
where
F: FnOnce(Arc<TestHost>, i32) -> Fut,
Fut: Future<Output = ()>,
{
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = Arc::new(TestHost::new().await);
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
// Pass a clone of the host so that it does not get dropped after.
f(host.clone(), sid).await;
// Sleep to give processes a chance to get cleaned up, preventing races in the next step.
tokio::time::sleep(Duration::from_millis(500)).await;
// Make sure job processes got cleaned up. Pass `is_direct_child: false` to target the
// job processes.
assert!(find_process_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false).is_none());
assert!(find_process_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false).is_none());
});
}
// Run these tests in their own processes with rusty-fork. They work by each creating a new session,
// then doing something with the child process that matches the session ID and expected process
// name.
// then finding the child process that matches the session ID and expected process name and doing
// something with that child.
rusty_fork_test! {
// Everything succeeded. All created subprocesses for jobs should get cleaned up, to avoid memory leaks.
#[test]
fn successful_prepare_and_validate() {
test_wrapper(|host, _sid| async move {
let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
let block_data = BlockData { state: 0, add: 512 };
host
.validate_candidate(
adder::wasm_binary_unwrap(),
ValidationParams {
parent_head: GenericHeadData(parent_head.encode()),
block_data: GenericBlockData(block_data.encode()),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
)
.await
.unwrap();
})
}
// What happens when the prepare worker (not the job) times out?
#[test]
fn prepare_worker_timeout() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
test_wrapper(|host, sid| async move {
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
......@@ -120,14 +170,7 @@ rusty_fork_test! {
// What happens when the execute worker (not the job) times out?
#[test]
fn execute_worker_timeout() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
......@@ -137,7 +180,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
......@@ -161,14 +204,7 @@ rusty_fork_test! {
// What happens when the prepare worker dies in the middle of a job?
#[test]
fn prepare_worker_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
test_wrapper(|host, sid| async move {
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
......@@ -186,14 +222,7 @@ rusty_fork_test! {
// What happens when the execute worker dies in the middle of a job?
#[test]
fn execute_worker_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
......@@ -203,7 +232,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
......@@ -227,14 +256,7 @@ rusty_fork_test! {
// What happens when the forked prepare job dies in the middle of its job?
#[test]
fn forked_prepare_job_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
test_wrapper(|host, sid| async move {
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
......@@ -256,14 +278,7 @@ rusty_fork_test! {
// What happens when the forked execute job dies in the middle of its job?
#[test]
fn forked_execute_job_killed_during_job() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
......@@ -273,7 +288,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
......@@ -301,14 +316,7 @@ rusty_fork_test! {
// See `run_worker` for why we need this invariant.
#[test]
fn ensure_prepare_processes_have_correct_num_threads() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
test_wrapper(|host, sid| async move {
let _ = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
......@@ -338,14 +346,7 @@ rusty_fork_test! {
// See `run_worker` for why we need this invariant.
#[test]
fn ensure_execute_processes_have_correct_num_threads() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
test_wrapper(|host, sid| async move {
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
......@@ -355,7 +356,7 @@ rusty_fork_test! {
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
block_data: GenericBlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment