Newer
Older
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! General PVF host integration tests checking the functionality of the PVF host itself.
use assert_matches::assert_matches;
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
use polkadot_node_core_pvf::SecurityStatus;
start, testing::build_workers_and_get_paths, Config, InvalidCandidate, Metrics, PrepareError,
PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
use polkadot_primitives::{ExecutorParam, ExecutorParams};
use std::time::Duration;
use tokio::sync::Mutex;
#[cfg(target_os = "linux")]
mod process;
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6);
const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(6);
// Keep a reference to the tempdir as it gets deleted on drop.
host: Mutex<ValidationHost>,
}
impl TestHost {
async fn new() -> Self {
Self::new_with_config(|_| ()).await
async fn new_with_config<F>(f: F) -> Self
let (prepare_worker_path, execute_worker_path) = build_workers_and_get_paths();
let cache_dir = tempfile::tempdir().unwrap();
let mut config = Config::new(
cache_dir.path().to_owned(),
None,
prepare_worker_path,
execute_worker_path,
);
let (host, task) = start(config, Metrics::default()).await.unwrap();
let _ = tokio::task::spawn(task);
Self { cache_dir, host: Mutex::new(host) }
async fn precheck_pvf(
&self,
code: &[u8],
executor_params: ExecutorParams,
) -> Result<(), PrepareError> {
let (result_tx, result_rx) = futures::channel::oneshot::channel();
let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
.expect("Compression works");
self.host
.lock()
.await
.precheck_pvf(
PvfPrepData::from_code(
code.into(),
executor_params,
TEST_PREPARATION_TIMEOUT,
PrepareJobKind::Prechecking,
),
result_tx,
)
.await
.unwrap();
result_rx.await.unwrap()
}
async fn validate_candidate(
&self,
code: &[u8],
params: ValidationParams,
executor_params: ExecutorParams,
) -> Result<ValidationResult, ValidationError> {
let (result_tx, result_rx) = futures::channel::oneshot::channel();
let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
.expect("Compression works");
self.host
.lock()
.await
.execute_pvf(
PvfPrepData::from_code(
code.into(),
executor_params,
TEST_PREPARATION_TIMEOUT,
PrepareJobKind::Compilation,
),
TEST_EXECUTION_TIMEOUT,
params.encode(),
polkadot_node_core_pvf::Priority::Normal,
result_tx,
)
.await
.unwrap();
result_rx.await.unwrap()
}
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
async fn security_status(&self) -> SecurityStatus {
self.host.lock().await.security_status.clone()
}
async fn prepare_job_terminates_on_timeout() {
let host = TestHost::new().await;
let start = std::time::Instant::now();
let result = host
.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default())
.await;
match result {
Err(PrepareError::TimedOut) => {},
r => panic!("{:?}", r),
}
let duration = std::time::Instant::now().duration_since(start);
assert!(duration >= TEST_PREPARATION_TIMEOUT);
assert!(duration < TEST_PREPARATION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
}
#[tokio::test]
async fn execute_job_terminates_on_timeout() {
let host = TestHost::new().await;
let start = std::time::Instant::now();
let result = host
.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)) => {},
let duration = std::time::Instant::now().duration_since(start);
assert!(duration >= TEST_EXECUTION_TIMEOUT);
assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
#[cfg(feature = "ci-only-tests")]
#[tokio::test]
async fn ensure_parallel_execution() {
// Run some jobs that do not complete, thus timing out.
let host = TestHost::new().await;
let execute_pvf_future_1 = host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
);
let execute_pvf_future_2 = host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
);
let start = std::time::Instant::now();
let (res1, res2) = futures::join!(execute_pvf_future_1, execute_pvf_future_2);
assert_matches!(
(res1, res2),
(
Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)),
Err(ValidationError::Invalid(InvalidCandidate::HardTimeout))
// Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel).
let duration = std::time::Instant::now().duration_since(start);
let max_duration = 2 * TEST_EXECUTION_TIMEOUT;
duration < max_duration,
"Expected duration {}ms to be less than {}ms",
duration.as_millis(),
max_duration.as_millis()
async fn execute_queue_doesnt_stall_if_workers_died() {
let host = TestHost::new_with_config(|cfg| {
cfg.execute_workers_max_num = 5;
})
.await;
// Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The
// first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of
// workers should be spun up.
let start = std::time::Instant::now();
futures::future::join_all((0u8..=8).map(|_| {
host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
// Total time should be >= 2 x TEST_EXECUTION_TIMEOUT (two separate sets of workers that should
// both timeout).
let duration = std::time::Instant::now().duration_since(start);
let max_duration = 2 * TEST_EXECUTION_TIMEOUT;
assert!(
duration >= max_duration,
"Expected duration {}ms to be greater than or equal to {}ms",
duration.as_millis(),
max_duration.as_millis()
);
#[cfg(feature = "ci-only-tests")]
#[tokio::test]
async fn execute_queue_doesnt_stall_with_varying_executor_params() {
let host = TestHost::new_with_config(|cfg| {
cfg.execute_workers_max_num = 2;
})
.await;
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
let executor_params_1 = ExecutorParams::default();
let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]);
// Here we spawn 6 validation jobs for the `halt` PVF and share those between 2 workers. Every
// 3rd job will have different set of executor parameters. All the workers should be killed
// and in this case the queue should respawn new workers with needed executor environment
// without waiting. The jobs will be executed in 3 batches, each running two jobs in parallel,
// and execution time would be roughly 3 * TEST_EXECUTION_TIMEOUT
let start = std::time::Instant::now();
futures::future::join_all((0u8..6).map(|i| {
host.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
match i % 3 {
0 => executor_params_1.clone(),
_ => executor_params_2.clone(),
},
)
}))
.await;
let duration = std::time::Instant::now().duration_since(start);
let min_duration = 3 * TEST_EXECUTION_TIMEOUT;
let max_duration = 4 * TEST_EXECUTION_TIMEOUT;
assert!(
duration >= min_duration,
"Expected duration {}ms to be greater than or equal to {}ms",
duration.as_millis(),
min_duration.as_millis()
);
assert!(
duration <= max_duration,
"Expected duration {}ms to be less than or equal to {}ms",
duration.as_millis(),
max_duration.as_millis()
);
}
// Test that deleting a prepared artifact does not lead to a dispute when we try to execute it.
#[tokio::test]
async fn deleting_prepared_artifact_does_not_dispute() {
let host = TestHost::new().await;
let cache_dir = host.cache_dir.path();
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
// Manually delete the prepared artifact from disk. The in-memory artifacts table won't change.
{
// Get the artifact path (asserting it exists).
let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
// Should contain the artifact and the worker dir.
assert_eq!(cache_dir.len(), 2);
let mut artifact_path = cache_dir.pop().unwrap().unwrap();
if artifact_path.path().is_dir() {
artifact_path = cache_dir.pop().unwrap().unwrap();
}
// Delete the artifact.
std::fs::remove_file(artifact_path.path()).unwrap();
}
// Try to validate, artifact should get recreated.
let result = host
.validate_candidate(
halt::wasm_binary_unwrap(),
ValidationParams {
block_data: BlockData(Vec::new()),
parent_head: Default::default(),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
Default::default(),
)
.await;
match result {
Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)) => {},
r => panic!("{:?}", r),
}
}
#[tokio::test]
async fn cache_cleared_on_startup() {
// Don't drop this host, it owns the `TempDir` which gets cleared on drop.
let host = TestHost::new().await;
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
// The cache dir should contain one artifact and one worker dir.
let cache_dir = host.cache_dir.path().to_owned();
assert_eq!(std::fs::read_dir(&cache_dir).unwrap().count(), 2);
// Start a new host, previous artifact should be cleared.
let _host = TestHost::new_with_config(|cfg| {
cfg.cache_path = cache_dir.clone();
})
.await;
assert_eq!(std::fs::read_dir(&cache_dir).unwrap().count(), 0);
}
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
// This test checks if the adder parachain runtime can be prepared with 10Mb preparation memory
// limit enforced. At the moment of writing, the limit if far enough to prepare the PVF. If it
// starts failing, either Wasmtime version has changed, or the PVF code itself has changed, and
// more memory is required now. Multi-threaded preparation, if ever enabled, may also affect
// memory consumption.
#[tokio::test]
async fn prechecking_within_memory_limits() {
let host = TestHost::new().await;
let result = host
.precheck_pvf(
::adder::wasm_binary_unwrap(),
ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(10 * 1024 * 1024)][..]),
)
.await;
assert_matches!(result, Ok(_));
}
// This test checks if the adder parachain runtime can be prepared with 512Kb preparation memory
// limit enforced. At the moment of writing, the limit if not enough to prepare the PVF, and the
// preparation is supposed to generate an error. If the test starts failing, either Wasmtime
// version has changed, or the PVF code itself has changed, and less memory is required now.
#[tokio::test]
async fn prechecking_out_of_memory() {
use polkadot_node_core_pvf::PrepareError;
let host = TestHost::new().await;
let result = host
.precheck_pvf(
::adder::wasm_binary_unwrap(),
ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(512 * 1024)][..]),
)
.await;
assert_matches!(result, Err(PrepareError::OutOfMemory));
}
// With one worker, run multiple preparation jobs serially. They should not conflict.
#[tokio::test]
async fn prepare_can_run_serially() {
let host = TestHost::new_with_config(|cfg| {
cfg.prepare_workers_hard_max_num = 1;
})
.await;
let _stats = host
.precheck_pvf(::adder::wasm_binary_unwrap(), Default::default())
.await
.unwrap();
// Prepare a different wasm blob to prevent skipping work.
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
}
// CI machines should be able to enable all the security features.
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
#[tokio::test]
async fn all_security_features_work() {
// Landlock is only available starting Linux 5.13, and we may be testing on an old kernel.
let can_enable_landlock = {
let sysinfo = sc_sysinfo::gather_sysinfo();
// The version will look something like "5.15.0-87-generic".
let version = sysinfo.linux_kernel.unwrap();
let version_split: Vec<&str> = version.split(".").collect();
let major: u32 = version_split[0].parse().unwrap();
let minor: u32 = version_split[1].parse().unwrap();
if major >= 6 {
true
} else if major == 5 {
minor >= 13
} else {
false
}
};
let host = TestHost::new().await;
assert_eq!(
host.security_status().await,
SecurityStatus {
can_enable_landlock,
can_enable_seccomp: true,
can_unshare_user_namespace_and_change_root: true,
}
);
}
// Regression test to make sure the unshare-pivot-root capability does not depend on the PVF
// artifacts cache existing.
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
#[tokio::test]
async fn nonexistant_cache_dir() {
let host = TestHost::new_with_config(|cfg| {
cfg.cache_path = cfg.cache_path.join("nonexistant_cache_dir");
})
.await;
assert!(host.security_status().await.can_unshare_user_namespace_and_change_root);
let _stats = host
.precheck_pvf(::adder::wasm_binary_unwrap(), Default::default())
.await
.unwrap();
}