Newer
Older
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use assert_matches::assert_matches;
use parity_scale_codec::Encode as _;
use polkadot_node_core_pvf::{
start, testing::get_and_check_worker_paths, Config, InvalidCandidate, Metrics, PrepareError,
PrepareJobKind, PrepareStats, PvfPrepData, ValidationError, ValidationHost,
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
use polkadot_primitives::ExecutorParams;
#[cfg(target_os = "linux")]
use rusty_fork::rusty_fork_test;
#[cfg(feature = "ci-only-tests")]
use polkadot_primitives::ExecutorParam;
use std::time::Duration;
use tokio::sync::Mutex;
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(3);
host: Mutex<ValidationHost>,
}
impl TestHost {
async fn new() -> Self {
Self::new_with_config(|_| ()).await
async fn new_with_config<F>(f: F) -> Self
let (prepare_worker_path, execute_worker_path) = get_and_check_worker_paths();
let cache_dir = tempfile::tempdir().unwrap();
let mut config = Config::new(
cache_dir.path().to_owned(),
None,
prepare_worker_path,
execute_worker_path,
);
let (host, task) = start(config, Metrics::default()).await;
let _ = tokio::task::spawn(task);
Self { cache_dir, host: Mutex::new(host) }
async fn precheck_pvf(
&self,
code: &[u8],
executor_params: ExecutorParams,
) -> Result<PrepareStats, PrepareError> {
let (result_tx, result_rx) = futures::channel::oneshot::channel();
let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
.expect("Compression works");
self.host
.lock()
.await
.precheck_pvf(
PvfPrepData::from_code(
code.into(),
executor_params,
TEST_PREPARATION_TIMEOUT,
PrepareJobKind::Prechecking,
),
result_tx,
)
.await
.unwrap();
result_rx.await.unwrap()
}
async fn validate_candidate(
&self,
code: &[u8],
params: ValidationParams,
executor_params: ExecutorParams,
) -> Result<ValidationResult, ValidationError> {
let (result_tx, result_rx) = futures::channel::oneshot::channel();
let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
.expect("Compression works");
self.host
.lock()
.await
.execute_pvf(
PvfPrepData::from_code(
code.into(),
executor_params,
TEST_PREPARATION_TIMEOUT,
PrepareJobKind::Compilation,
),
TEST_EXECUTION_TIMEOUT,
params.encode(),
polkadot_node_core_pvf::Priority::Normal,
result_tx,
)
.await
.unwrap();
result_rx.await.unwrap()
}
}
let host = TestHost::new().await;
let start = std::time::Instant::now();
let result = host
.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) => {},
let duration = std::time::Instant::now().duration_since(start);
assert!(duration >= TEST_EXECUTION_TIMEOUT);
assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
#[cfg(target_os = "linux")]
fn kill_by_sid_and_name(sid: i32, exe_name: &'static str) {
use procfs::process;
let all_processes: Vec<process::Process> = process::all_processes()
.expect("Can't read /proc")
.filter_map(|p| match p {
Ok(p) => Some(p), // happy path
Err(e) => match e {
// process vanished during iteration, ignore it
procfs::ProcError::NotFound(_) => None,
x => {
panic!("some unknown error: {}", x);
},
},
})
.collect();
for process in all_processes {
if process.stat().unwrap().session == sid &&
process.exe().unwrap().to_str().unwrap().contains(exe_name)
{
assert_eq!(unsafe { libc::kill(process.pid(), 9) }, 0);
}
}
}
// Run these tests in their own processes with rusty-fork. They work by each creating a new session,
// then killing the worker process that matches the session ID and expected worker name.
#[cfg(target_os = "linux")]
rusty_fork_test! {
// What happens when the prepare worker dies in the middle of a job?
#[test]
fn prepare_worker_killed_during_job() {
const PROCESS_NAME: &'static str = "polkadot-prepare-worker";
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
// Run a future that kills the job in the middle of the timeout.
async {
tokio::time::sleep(TEST_PREPARATION_TIMEOUT / 2).await;
kill_by_sid_and_name(sid, PROCESS_NAME);
}
);
assert_matches!(result, Err(PrepareError::IoErr(_)));
})
}
// What happens when the execute worker dies in the middle of a job?
#[test]
fn execute_worker_killed_during_job() {
const PROCESS_NAME: &'static str = "polkadot-execute-worker";
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let host = TestHost::new().await;
// Create a new session and get the session ID.
let sid = unsafe { libc::setsid() };
assert!(sid > 0);
// Prepare the artifact ahead of time.
let binary = halt::wasm_binary_unwrap();
host.precheck_pvf(binary, Default::default()).await.unwrap();
let (result, _) = futures::join!(
// Choose an job that would normally take the entire timeout.
host.validate_candidate(
binary,
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
),
// Run a future that kills the job in the middle of the timeout.
async {
tokio::time::sleep(TEST_EXECUTION_TIMEOUT / 2).await;
kill_by_sid_and_name(sid, PROCESS_NAME);
}
);
assert_matches!(
result,
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath))
);
})
}
}
#[cfg(feature = "ci-only-tests")]
#[tokio::test]
async fn ensure_parallel_execution() {
// Run some jobs that do not complete, thus timing out.
let host = TestHost::new().await;
let execute_pvf_future_1 = host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
);
let execute_pvf_future_2 = host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
);
let start = std::time::Instant::now();
let (res1, res2) = futures::join!(execute_pvf_future_1, execute_pvf_future_2);
assert_matches!(
(res1, res2),
(
Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)),
Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout))
)
);
// Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel).
let duration = std::time::Instant::now().duration_since(start);
let max_duration = 2 * TEST_EXECUTION_TIMEOUT;
duration < max_duration,
"Expected duration {}ms to be less than {}ms",
duration.as_millis(),
max_duration.as_millis()
async fn execute_queue_doesnt_stall_if_workers_died() {
let host = TestHost::new_with_config(|cfg| {
cfg.execute_workers_max_num = 5;
})
.await;
// Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The
// first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of
// workers should be spun up.
let start = std::time::Instant::now();
futures::future::join_all((0u8..=8).map(|_| {
host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
// Total time should be >= 2 x TEST_EXECUTION_TIMEOUT (two separate sets of workers that should
// both timeout).
let duration = std::time::Instant::now().duration_since(start);
let max_duration = 2 * TEST_EXECUTION_TIMEOUT;
assert!(
duration >= max_duration,
"Expected duration {}ms to be greater than or equal to {}ms",
duration.as_millis(),
max_duration.as_millis()
);
#[cfg(feature = "ci-only-tests")]
#[tokio::test]
async fn execute_queue_doesnt_stall_with_varying_executor_params() {
let host = TestHost::new_with_config(|cfg| {
cfg.execute_workers_max_num = 2;
})
.await;
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
let executor_params_1 = ExecutorParams::default();
let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]);
// Here we spawn 6 validation jobs for the `halt` PVF and share those between 2 workers. Every
// 3rd job will have different set of executor parameters. All the workers should be killed
// and in this case the queue should respawn new workers with needed executor environment
// without waiting. The jobs will be executed in 3 batches, each running two jobs in parallel,
// and execution time would be roughly 3 * TEST_EXECUTION_TIMEOUT
let start = std::time::Instant::now();
futures::future::join_all((0u8..6).map(|i| {
host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
match i % 3 {
0 => executor_params_1.clone(),
_ => executor_params_2.clone(),
},
)
}))
.await;
let duration = std::time::Instant::now().duration_since(start);
let min_duration = 3 * TEST_EXECUTION_TIMEOUT;
let max_duration = 4 * TEST_EXECUTION_TIMEOUT;
assert!(
duration >= min_duration,
"Expected duration {}ms to be greater than or equal to {}ms",
duration.as_millis(),
min_duration.as_millis()
);
assert!(
duration <= max_duration,
"Expected duration {}ms to be less than or equal to {}ms",
duration.as_millis(),
max_duration.as_millis()
);
}
// Test that deleting a prepared artifact does not lead to a dispute when we try to execute it.
#[tokio::test]
async fn deleting_prepared_artifact_does_not_dispute() {
let host = TestHost::new().await;
let cache_dir = host.cache_dir.path();
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
// Manually delete the prepared artifact from disk. The in-memory artifacts table won't change.
{
// Get the artifact path (asserting it exists).
let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
// Should contain the artifact and the worker dir.
assert_eq!(cache_dir.len(), 2);
let mut artifact_path = cache_dir.pop().unwrap().unwrap();
if artifact_path.path().is_dir() {
artifact_path = cache_dir.pop().unwrap().unwrap();
}
// Delete the artifact.
std::fs::remove_file(artifact_path.path()).unwrap();
}
// Try to validate, artifact should get recreated.
let result = host
.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
)
.await;
match result {
Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) => {},
r => panic!("{:?}", r),
}
}
// With one worker, run multiple preparation jobs serially. They should not conflict.
#[tokio::test]
async fn prepare_can_run_serially() {
let host = TestHost::new_with_config(|cfg| {
cfg.prepare_workers_hard_max_num = 1;
})
.await;
let _stats = host
.precheck_pvf(::adder::wasm_binary_unwrap(), Default::default())
.await
.unwrap();
// Prepare a different wasm blob to prevent skipping work.
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
}