Newer
Older
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! General PVF host integration tests checking the functionality of the PVF host itself.
use assert_matches::assert_matches;
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
use polkadot_node_core_pvf::SecurityStatus;
start, testing::build_workers_and_get_paths, Config, InvalidCandidate, Metrics,
PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError,
ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
use polkadot_primitives::{ExecutorParam, ExecutorParams, PvfExecKind, PvfPrepKind};
use std::{io::Write, time::Duration};
use tokio::sync::Mutex;
#[cfg(target_os = "linux")]
mod process;
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6);
const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(6);
// Keep a reference to the tempdir as it gets deleted on drop.
host: Mutex<ValidationHost>,
}
impl TestHost {
async fn new() -> Self {
Self::new_with_config(|_| ()).await
async fn new_with_config<F>(f: F) -> Self
let (prepare_worker_path, execute_worker_path) = build_workers_and_get_paths();
let cache_dir = tempfile::tempdir().unwrap();
let mut config = Config::new(
cache_dir.path().to_owned(),
None,
prepare_worker_path,
execute_worker_path,
Alexandru Gheorghe
committed
2,
1,
2,
let (host, task) = start(config, Metrics::default()).await.unwrap();
let _ = tokio::task::spawn(task);
Self { cache_dir, host: Mutex::new(host) }
async fn precheck_pvf(
&self,
code: &[u8],
executor_params: ExecutorParams,
) -> Result<(), PrepareError> {
let (result_tx, result_rx) = futures::channel::oneshot::channel();
let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
.expect("Compression works");
self.host
.lock()
.await
.precheck_pvf(
PvfPrepData::from_code(
code.into(),
executor_params,
TEST_PREPARATION_TIMEOUT,
PrepareJobKind::Prechecking,
),
result_tx,
)
.await
.unwrap();
result_rx.await.unwrap()
}
async fn validate_candidate(
&self,
code: &[u8],
params: ValidationParams,
executor_params: ExecutorParams,
) -> Result<ValidationResult, ValidationError> {
let (result_tx, result_rx) = futures::channel::oneshot::channel();
let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
.expect("Compression works");
self.host
.lock()
.await
.execute_pvf(
PvfPrepData::from_code(
code.into(),
executor_params,
TEST_PREPARATION_TIMEOUT,
PrepareJobKind::Compilation,
),
TEST_EXECUTION_TIMEOUT,
params.encode(),
polkadot_node_core_pvf::Priority::Normal,
result_tx,
)
.await
.unwrap();
result_rx.await.unwrap()
}
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
async fn security_status(&self) -> SecurityStatus {
self.host.lock().await.security_status.clone()
}
async fn prepare_job_terminates_on_timeout() {
let host = TestHost::new().await;
let start = std::time::Instant::now();
let result = host
.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default())
.await;
match result {
Err(PrepareError::TimedOut) => {},
r => panic!("{:?}", r),
}
let duration = std::time::Instant::now().duration_since(start);
assert!(duration >= TEST_PREPARATION_TIMEOUT);
assert!(duration < TEST_PREPARATION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
}
#[tokio::test]
async fn execute_job_terminates_on_timeout() {
let host = TestHost::new().await;
let start = std::time::Instant::now();
let result = host
.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)) => {},
let duration = std::time::Instant::now().duration_since(start);
assert!(duration >= TEST_EXECUTION_TIMEOUT);
assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
#[cfg(feature = "ci-only-tests")]
#[tokio::test]
async fn ensure_parallel_execution() {
// Run some jobs that do not complete, thus timing out.
let host = TestHost::new().await;
let execute_pvf_future_1 = host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
);
let execute_pvf_future_2 = host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
);
let start = std::time::Instant::now();
let (res1, res2) = futures::join!(execute_pvf_future_1, execute_pvf_future_2);
assert_matches!(
(res1, res2),
(
Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)),
Err(ValidationError::Invalid(InvalidCandidate::HardTimeout))
// Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel).
let duration = std::time::Instant::now().duration_since(start);
let max_duration = 2 * TEST_EXECUTION_TIMEOUT;
duration < max_duration,
"Expected duration {}ms to be less than {}ms",
duration.as_millis(),
max_duration.as_millis()
async fn execute_queue_doesnt_stall_if_workers_died() {
let host = TestHost::new_with_config(|cfg| {
cfg.execute_workers_max_num = 5;
})
.await;
// Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The
// first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of
// workers should be spun up.
let start = std::time::Instant::now();
futures::future::join_all((0u8..=8).map(|_| {
host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
// Total time should be >= 2 x TEST_EXECUTION_TIMEOUT (two separate sets of workers that should
// both timeout).
let duration = std::time::Instant::now().duration_since(start);
let max_duration = 2 * TEST_EXECUTION_TIMEOUT;
assert!(
duration >= max_duration,
"Expected duration {}ms to be greater than or equal to {}ms",
duration.as_millis(),
max_duration.as_millis()
);
#[cfg(feature = "ci-only-tests")]
#[tokio::test]
async fn execute_queue_doesnt_stall_with_varying_executor_params() {
let host = TestHost::new_with_config(|cfg| {
cfg.execute_workers_max_num = 2;
})
.await;
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
let executor_params_1 = ExecutorParams::default();
let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]);
// Here we spawn 6 validation jobs for the `halt` PVF and share those between 2 workers. Every
// 3rd job will have different set of executor parameters. All the workers should be killed
// and in this case the queue should respawn new workers with needed executor environment
// without waiting. The jobs will be executed in 3 batches, each running two jobs in parallel,
// and execution time would be roughly 3 * TEST_EXECUTION_TIMEOUT
let start = std::time::Instant::now();
futures::future::join_all((0u8..6).map(|i| {
host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
match i % 3 {
0 => executor_params_1.clone(),
_ => executor_params_2.clone(),
},
)
}))
.await;
let duration = std::time::Instant::now().duration_since(start);
let min_duration = 3 * TEST_EXECUTION_TIMEOUT;
let max_duration = 4 * TEST_EXECUTION_TIMEOUT;
assert!(
duration >= min_duration,
"Expected duration {}ms to be greater than or equal to {}ms",
duration.as_millis(),
min_duration.as_millis()
);
assert!(
duration <= max_duration,
"Expected duration {}ms to be less than or equal to {}ms",
duration.as_millis(),
max_duration.as_millis()
);
}
// Test that deleting a prepared artifact does not lead to a dispute when we try to execute it.
#[tokio::test]
async fn deleting_prepared_artifact_does_not_dispute() {
let host = TestHost::new().await;
let cache_dir = host.cache_dir.path();
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
// Manually delete the prepared artifact from disk. The in-memory artifacts table won't change.
{
// Get the artifact path (asserting it exists).
let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
// Should contain the artifact and the worker dir.
assert_eq!(cache_dir.len(), 2);
let mut artifact_path = cache_dir.pop().unwrap().unwrap();
if artifact_path.path().is_dir() {
artifact_path = cache_dir.pop().unwrap().unwrap();
}
// Delete the artifact.
std::fs::remove_file(artifact_path.path()).unwrap();
}
// Try to validate, artifact should get recreated.
let result = host
.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
)
.await;
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
assert_matches!(result, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)));
}
// Test that corruption of a prepared artifact does not lead to a dispute when we try to execute it.
#[tokio::test]
async fn corrupted_prepared_artifact_does_not_dispute() {
let host = TestHost::new().await;
let cache_dir = host.cache_dir.path();
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
// Manually corrupting the prepared artifact from disk. The in-memory artifacts table won't
// change.
let artifact_path = {
// Get the artifact path (asserting it exists).
let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
// Should contain the artifact and the worker dir.
assert_eq!(cache_dir.len(), 2);
let mut artifact_path = cache_dir.pop().unwrap().unwrap();
if artifact_path.path().is_dir() {
artifact_path = cache_dir.pop().unwrap().unwrap();
}
// Corrupt the artifact.
let mut f = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(artifact_path.path())
.unwrap();
f.write_all(b"corrupted wasm").unwrap();
f.flush().unwrap();
artifact_path
};
assert!(artifact_path.path().exists());
// Try to validate, artifact should get removed because of the corruption.
let result = host
.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
)
.await;
assert_matches!(
result,
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(_)))
);
// because of RuntimeConstruction we may retry
host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
// The actual artifact removal is done concurrently
// with sending of the result of the execution
// it is not a problem for further re-preparation as
// artifact filenames are random
for _ in 1..5 {
if !artifact_path.path().exists() {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(
!artifact_path.path().exists(),
"the corrupted artifact ({}) should be deleted by the host",
artifact_path.path().display()
);
#[tokio::test]
async fn cache_cleared_on_startup() {
// Don't drop this host, it owns the `TempDir` which gets cleared on drop.
let host = TestHost::new().await;
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
// The cache dir should contain one artifact and one worker dir.
let cache_dir = host.cache_dir.path().to_owned();
assert_eq!(std::fs::read_dir(&cache_dir).unwrap().count(), 2);
// Start a new host, previous artifact should be cleared.
let _host = TestHost::new_with_config(|cfg| {
cfg.cache_path = cache_dir.clone();
})
.await;
assert_eq!(std::fs::read_dir(&cache_dir).unwrap().count(), 0);
}
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
// This test checks if the adder parachain runtime can be prepared with 10Mb preparation memory
// limit enforced. At the moment of writing, the limit if far enough to prepare the PVF. If it
// starts failing, either Wasmtime version has changed, or the PVF code itself has changed, and
// more memory is required now. Multi-threaded preparation, if ever enabled, may also affect
// memory consumption.
#[tokio::test]
async fn prechecking_within_memory_limits() {
let host = TestHost::new().await;
let result = host
.precheck_pvf(
::adder::wasm_binary_unwrap(),
ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(10 * 1024 * 1024)][..]),
)
.await;
assert_matches!(result, Ok(_));
}
// This test checks if the adder parachain runtime can be prepared with 512Kb preparation memory
// limit enforced. At the moment of writing, the limit if not enough to prepare the PVF, and the
// preparation is supposed to generate an error. If the test starts failing, either Wasmtime
// version has changed, or the PVF code itself has changed, and less memory is required now.
#[tokio::test]
async fn prechecking_out_of_memory() {
use polkadot_node_core_pvf::PrepareError;
let host = TestHost::new().await;
let result = host
.precheck_pvf(
::adder::wasm_binary_unwrap(),
ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(512 * 1024)][..]),
)
.await;
assert_matches!(result, Err(PrepareError::OutOfMemory));
}
// With one worker, run multiple preparation jobs serially. They should not conflict.
#[tokio::test]
async fn prepare_can_run_serially() {
let host = TestHost::new_with_config(|cfg| {
cfg.prepare_workers_hard_max_num = 1;
})
.await;
let _stats = host
.precheck_pvf(::adder::wasm_binary_unwrap(), Default::default())
.await
.unwrap();
// Prepare a different wasm blob to prevent skipping work.
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
}
// CI machines should be able to enable all the security features.
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
#[tokio::test]
async fn all_security_features_work() {
// Landlock is only available starting Linux 5.13, and we may be testing on an old kernel.
let can_enable_landlock = {
let sysinfo = sc_sysinfo::gather_sysinfo();
// The version will look something like "5.15.0-87-generic".
let version = sysinfo.linux_kernel.unwrap();
let version_split: Vec<&str> = version.split(".").collect();
let major: u32 = version_split[0].parse().unwrap();
let minor: u32 = version_split[1].parse().unwrap();
if major >= 6 {
true
} else if major == 5 {
minor >= 13
} else {
false
}
};
let host = TestHost::new().await;
assert_eq!(
host.security_status().await,
SecurityStatus {
// Disabled in tests to not enforce the presence of security features. This CI-only test
// is the only one that tests them.
can_enable_landlock,
can_enable_seccomp: true,
can_unshare_user_namespace_and_change_root: true,
// Regression test to make sure the unshare-pivot-root capability does not depend on the PVF
// artifacts cache existing.
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
#[tokio::test]
async fn nonexistent_cache_dir() {
let host = TestHost::new_with_config(|cfg| {
cfg.cache_path = cfg.cache_path.join("nonexistent_cache_dir");
})
.await;
assert!(host.security_status().await.can_unshare_user_namespace_and_change_root);
let _stats = host
.precheck_pvf(::adder::wasm_binary_unwrap(), Default::default())
.await
.unwrap();
}
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
// Checks the the artifact is not re-prepared when the executor environment parameters change
// in a way not affecting the preparation
#[tokio::test]
async fn artifact_does_not_reprepare_on_non_meaningful_exec_parameter_change() {
let host = TestHost::new_with_config(|cfg| {
cfg.prepare_workers_hard_max_num = 1;
})
.await;
let cache_dir = host.cache_dir.path();
let set1 = ExecutorParams::default();
let set2 =
ExecutorParams::from(&[ExecutorParam::PvfExecTimeout(PvfExecKind::Backing, 2500)][..]);
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set1).await.unwrap();
let md1 = {
let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
assert_eq!(cache_dir.len(), 2);
let mut artifact_path = cache_dir.pop().unwrap().unwrap();
if artifact_path.path().is_dir() {
artifact_path = cache_dir.pop().unwrap().unwrap();
}
std::fs::metadata(artifact_path.path()).unwrap()
};
// FS times are not monotonical so we wait 2 secs here to be sure that the creation time of the
// second attifact will be different
tokio::time::sleep(Duration::from_secs(2)).await;
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set2).await.unwrap();
let md2 = {
let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
assert_eq!(cache_dir.len(), 2);
let mut artifact_path = cache_dir.pop().unwrap().unwrap();
if artifact_path.path().is_dir() {
artifact_path = cache_dir.pop().unwrap().unwrap();
}
std::fs::metadata(artifact_path.path()).unwrap()
};
assert_eq!(md1.created().unwrap(), md2.created().unwrap());
}
// Checks if the artifact is re-prepared if the re-preparation is needed by the nature of
// the execution environment parameters change
#[tokio::test]
async fn artifact_does_reprepare_on_meaningful_exec_parameter_change() {
let host = TestHost::new_with_config(|cfg| {
cfg.prepare_workers_hard_max_num = 1;
})
.await;
let cache_dir = host.cache_dir.path();
let set1 = ExecutorParams::default();
let set2 =
ExecutorParams::from(&[ExecutorParam::PvfPrepTimeout(PvfPrepKind::Prepare, 60000)][..]);
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set1).await.unwrap();
let cache_dir_contents: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
assert_eq!(cache_dir_contents.len(), 2);
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set2).await.unwrap();
let cache_dir_contents: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
assert_eq!(cache_dir_contents.len(), 3); // new artifact has been added
}