main.rs 19.3 KiB
Newer Older
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! General PVF host integration tests checking the functionality of the PVF host itself.

use assert_matches::assert_matches;
use parity_scale_codec::Encode as _;
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
use polkadot_node_core_pvf::SecurityStatus;
use polkadot_node_core_pvf::{
	start, testing::build_workers_and_get_paths, Config, InvalidCandidate, Metrics,
	PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError,
	ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
use polkadot_primitives::{ExecutorParam, ExecutorParams, PvfExecKind, PvfPrepKind};
use std::{io::Write, time::Duration};
use tokio::sync::Mutex;
#[cfg(target_os = "linux")]
mod process;
mod worker_common;

const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6);
const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(6);

struct TestHost {
	// Keep a reference to the tempdir as it gets deleted on drop.
	cache_dir: tempfile::TempDir,
	host: Mutex<ValidationHost>,
}

impl TestHost {
	async fn new() -> Self {
		Self::new_with_config(|_| ()).await
	async fn new_with_config<F>(f: F) -> Self
	where
		F: FnOnce(&mut Config),
	{
		let (prepare_worker_path, execute_worker_path) = build_workers_and_get_paths();
		let cache_dir = tempfile::tempdir().unwrap();
		let mut config = Config::new(
			cache_dir.path().to_owned(),
			None,
			prepare_worker_path,
			execute_worker_path,
		f(&mut config);
		let (host, task) = start(config, Metrics::default()).await.unwrap();
		let _ = tokio::task::spawn(task);
		Self { cache_dir, host: Mutex::new(host) }
	async fn precheck_pvf(
		&self,
		code: &[u8],
		executor_params: ExecutorParams,
	) -> Result<(), PrepareError> {
		let (result_tx, result_rx) = futures::channel::oneshot::channel();

		let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
			.expect("Compression works");

		self.host
			.lock()
			.await
			.precheck_pvf(
				PvfPrepData::from_code(
					code.into(),
					executor_params,
					TEST_PREPARATION_TIMEOUT,
					PrepareJobKind::Prechecking,
				),
				result_tx,
			)
			.await
			.unwrap();
		result_rx.await.unwrap()
	}

	async fn validate_candidate(
		&self,
		code: &[u8],
		params: ValidationParams,
		executor_params: ExecutorParams,
	) -> Result<ValidationResult, ValidationError> {
		let (result_tx, result_rx) = futures::channel::oneshot::channel();
		let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
			.expect("Compression works");
		self.host
			.lock()
			.await
			.execute_pvf(
				PvfPrepData::from_code(
					code.into(),
					executor_params,
					TEST_PREPARATION_TIMEOUT,
					PrepareJobKind::Compilation,
				),
				params.encode(),
				polkadot_node_core_pvf::Priority::Normal,
				result_tx,
			)
			.await
			.unwrap();
		result_rx.await.unwrap()
	}

	#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
	async fn security_status(&self) -> SecurityStatus {
		self.host.lock().await.security_status.clone()
	}
async fn prepare_job_terminates_on_timeout() {
	let host = TestHost::new().await;

	let start = std::time::Instant::now();
	let result = host
		.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default())
		.await;

	match result {
		Err(PrepareError::TimedOut) => {},
		r => panic!("{:?}", r),
	}

	let duration = std::time::Instant::now().duration_since(start);
	assert!(duration >= TEST_PREPARATION_TIMEOUT);
	assert!(duration < TEST_PREPARATION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
}

#[tokio::test]
async fn execute_job_terminates_on_timeout() {
	let host = TestHost::new().await;
	let start = std::time::Instant::now();
	let result = host
		.validate_candidate(
			halt::wasm_binary_unwrap(),
			ValidationParams {
				block_data: BlockData(Vec::new()),
				parent_head: Default::default(),
				relay_parent_number: 1,
				relay_parent_storage_root: Default::default(),
			},
			Default::default(),
		)
		.await;

	match result {
		Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)) => {},
		r => panic!("{:?}", r),
	}

	let duration = std::time::Instant::now().duration_since(start);
	assert!(duration >= TEST_EXECUTION_TIMEOUT);
	assert!(duration < TEST_EXECUTION_TIMEOUT * JOB_TIMEOUT_WALL_CLOCK_FACTOR);
#[cfg(feature = "ci-only-tests")]
#[tokio::test]
async fn ensure_parallel_execution() {
	// Run some jobs that do not complete, thus timing out.
	let host = TestHost::new().await;
	let execute_pvf_future_1 = host.validate_candidate(
		halt::wasm_binary_unwrap(),
		ValidationParams {
			block_data: BlockData(Vec::new()),
			parent_head: Default::default(),
			relay_parent_number: 1,
			relay_parent_storage_root: Default::default(),
		},
		Default::default(),
	);
	let execute_pvf_future_2 = host.validate_candidate(
		halt::wasm_binary_unwrap(),
		ValidationParams {
			block_data: BlockData(Vec::new()),
			parent_head: Default::default(),
			relay_parent_number: 1,
			relay_parent_storage_root: Default::default(),
		},
		Default::default(),
	);

	let start = std::time::Instant::now();
	let (res1, res2) = futures::join!(execute_pvf_future_1, execute_pvf_future_2);
	assert_matches!(
		(res1, res2),
		(
			Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)),
			Err(ValidationError::Invalid(InvalidCandidate::HardTimeout))
	// Total time should be < 2 x TEST_EXECUTION_TIMEOUT (two workers run in parallel).
	let duration = std::time::Instant::now().duration_since(start);
	let max_duration = 2 * TEST_EXECUTION_TIMEOUT;
	assert!(
		duration < max_duration,
		"Expected duration {}ms to be less than {}ms",
		duration.as_millis(),
		max_duration.as_millis()
async fn execute_queue_doesnt_stall_if_workers_died() {
	let host = TestHost::new_with_config(|cfg| {
		cfg.execute_workers_max_num = 5;

	// Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The
	// first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of
	// workers should be spun up.
	let start = std::time::Instant::now();
	futures::future::join_all((0u8..=8).map(|_| {
		host.validate_candidate(
			halt::wasm_binary_unwrap(),
			ValidationParams {
				block_data: BlockData(Vec::new()),
				parent_head: Default::default(),
				relay_parent_number: 1,
				relay_parent_storage_root: Default::default(),
			},
			Default::default(),

	// Total time should be >= 2 x TEST_EXECUTION_TIMEOUT (two separate sets of workers that should
	// both timeout).
	let duration = std::time::Instant::now().duration_since(start);
	let max_duration = 2 * TEST_EXECUTION_TIMEOUT;
	assert!(
		duration >= max_duration,
		"Expected duration {}ms to be greater than or equal to {}ms",
		duration.as_millis(),
		max_duration.as_millis()
	);
#[cfg(feature = "ci-only-tests")]
#[tokio::test]
async fn execute_queue_doesnt_stall_with_varying_executor_params() {
	let host = TestHost::new_with_config(|cfg| {
		cfg.execute_workers_max_num = 2;

	let executor_params_1 = ExecutorParams::default();
	let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]);

	// Here we spawn 6 validation jobs for the `halt` PVF and share those between 2 workers. Every
	// 3rd job will have different set of executor parameters. All the workers should be killed
	// and in this case the queue should respawn new workers with needed executor environment
	// without waiting. The jobs will be executed in 3 batches, each running two jobs in parallel,
	// and execution time would be roughly 3 * TEST_EXECUTION_TIMEOUT
	let start = std::time::Instant::now();
	futures::future::join_all((0u8..6).map(|i| {
		host.validate_candidate(
			halt::wasm_binary_unwrap(),
			ValidationParams {
				block_data: BlockData(Vec::new()),
				parent_head: Default::default(),
				relay_parent_number: 1,
				relay_parent_storage_root: Default::default(),
			},
			match i % 3 {
				0 => executor_params_1.clone(),
				_ => executor_params_2.clone(),
			},
		)
	}))
	.await;

	let duration = std::time::Instant::now().duration_since(start);
	let min_duration = 3 * TEST_EXECUTION_TIMEOUT;
	let max_duration = 4 * TEST_EXECUTION_TIMEOUT;
	assert!(
		duration >= min_duration,
		"Expected duration {}ms to be greater than or equal to {}ms",
		duration.as_millis(),
		min_duration.as_millis()
	);
	assert!(
		duration <= max_duration,
		"Expected duration {}ms to be less than or equal to {}ms",
		duration.as_millis(),
		max_duration.as_millis()
	);
}

// Test that deleting a prepared artifact does not lead to a dispute when we try to execute it.
#[tokio::test]
async fn deleting_prepared_artifact_does_not_dispute() {
	let host = TestHost::new().await;
	let cache_dir = host.cache_dir.path();
	let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
	// Manually delete the prepared artifact from disk. The in-memory artifacts table won't change.
	{
		// Get the artifact path (asserting it exists).
		let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
		// Should contain the artifact and the worker dir.
		assert_eq!(cache_dir.len(), 2);
		let mut artifact_path = cache_dir.pop().unwrap().unwrap();
		if artifact_path.path().is_dir() {
			artifact_path = cache_dir.pop().unwrap().unwrap();
		}

		// Delete the artifact.
		std::fs::remove_file(artifact_path.path()).unwrap();
	}

	// Try to validate, artifact should get recreated.
	let result = host
		.validate_candidate(
			halt::wasm_binary_unwrap(),
			ValidationParams {
				block_data: BlockData(Vec::new()),
				parent_head: Default::default(),
				relay_parent_number: 1,
				relay_parent_storage_root: Default::default(),
			},
			Default::default(),
		)
		.await;

	assert_matches!(result, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)));
}

// Test that corruption of a prepared artifact does not lead to a dispute when we try to execute it.
#[tokio::test]
async fn corrupted_prepared_artifact_does_not_dispute() {
	let host = TestHost::new().await;
	let cache_dir = host.cache_dir.path();

	let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();

	// Manually corrupting the prepared artifact from disk. The in-memory artifacts table won't
	// change.
	let artifact_path = {
		// Get the artifact path (asserting it exists).
		let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
		// Should contain the artifact and the worker dir.
		assert_eq!(cache_dir.len(), 2);
		let mut artifact_path = cache_dir.pop().unwrap().unwrap();
		if artifact_path.path().is_dir() {
			artifact_path = cache_dir.pop().unwrap().unwrap();
		}

		// Corrupt the artifact.
		let mut f = std::fs::OpenOptions::new()
			.write(true)
			.truncate(true)
			.open(artifact_path.path())
			.unwrap();
		f.write_all(b"corrupted wasm").unwrap();
		f.flush().unwrap();
		artifact_path
	};

	assert!(artifact_path.path().exists());

	// Try to validate, artifact should get removed because of the corruption.
	let result = host
		.validate_candidate(
			halt::wasm_binary_unwrap(),
			ValidationParams {
				block_data: BlockData(Vec::new()),
				parent_head: Default::default(),
				relay_parent_number: 1,
				relay_parent_storage_root: Default::default(),
			},
			Default::default(),
		)
		.await;

	assert_matches!(
		result,
		Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(_)))
	);

	// because of RuntimeConstruction we may retry
	host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();

	// The actual artifact removal is done concurrently
	// with sending of the result of the execution
	// it is not a problem for further re-preparation as
	// artifact filenames are random
	for _ in 1..5 {
		if !artifact_path.path().exists() {
			break;
		}
		tokio::time::sleep(Duration::from_secs(1)).await;

	assert!(
		!artifact_path.path().exists(),
		"the corrupted artifact ({}) should be deleted by the host",
		artifact_path.path().display()
	);
#[tokio::test]
async fn cache_cleared_on_startup() {
	// Don't drop this host, it owns the `TempDir` which gets cleared on drop.
	let host = TestHost::new().await;

	let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();

	// The cache dir should contain one artifact and one worker dir.
	let cache_dir = host.cache_dir.path().to_owned();
	assert_eq!(std::fs::read_dir(&cache_dir).unwrap().count(), 2);

	// Start a new host, previous artifact should be cleared.
	let _host = TestHost::new_with_config(|cfg| {
		cfg.cache_path = cache_dir.clone();
	})
	.await;
	assert_eq!(std::fs::read_dir(&cache_dir).unwrap().count(), 0);
}

// This test checks if the adder parachain runtime can be prepared with 10Mb preparation memory
// limit enforced. At the moment of writing, the limit if far enough to prepare the PVF. If it
// starts failing, either Wasmtime version has changed, or the PVF code itself has changed, and
// more memory is required now. Multi-threaded preparation, if ever enabled, may also affect
// memory consumption.
#[tokio::test]
async fn prechecking_within_memory_limits() {
	let host = TestHost::new().await;
	let result = host
		.precheck_pvf(
			::adder::wasm_binary_unwrap(),
			ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(10 * 1024 * 1024)][..]),
		)
		.await;

	assert_matches!(result, Ok(_));
}

// This test checks if the adder parachain runtime can be prepared with 512Kb preparation memory
// limit enforced. At the moment of writing, the limit if not enough to prepare the PVF, and the
// preparation is supposed to generate an error. If the test starts failing, either Wasmtime
// version has changed, or the PVF code itself has changed, and less memory is required now.
#[tokio::test]
async fn prechecking_out_of_memory() {
	use polkadot_node_core_pvf::PrepareError;

	let host = TestHost::new().await;
	let result = host
		.precheck_pvf(
			::adder::wasm_binary_unwrap(),
			ExecutorParams::from(&[ExecutorParam::PrecheckingMaxMemory(512 * 1024)][..]),
		)
		.await;

	assert_matches!(result, Err(PrepareError::OutOfMemory));
}

// With one worker, run multiple preparation jobs serially. They should not conflict.
#[tokio::test]
async fn prepare_can_run_serially() {
	let host = TestHost::new_with_config(|cfg| {
		cfg.prepare_workers_hard_max_num = 1;

	let _stats = host
		.precheck_pvf(::adder::wasm_binary_unwrap(), Default::default())
		.await
		.unwrap();

	// Prepare a different wasm blob to prevent skipping work.
	let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), Default::default()).await.unwrap();
}

// CI machines should be able to enable all the security features.
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
#[tokio::test]
async fn all_security_features_work() {
	// Landlock is only available starting Linux 5.13, and we may be testing on an old kernel.
	let can_enable_landlock = {
		let sysinfo = sc_sysinfo::gather_sysinfo();
		// The version will look something like "5.15.0-87-generic".
		let version = sysinfo.linux_kernel.unwrap();
		let version_split: Vec<&str> = version.split(".").collect();
		let major: u32 = version_split[0].parse().unwrap();
		let minor: u32 = version_split[1].parse().unwrap();
		if major >= 6 {
			true
		} else if major == 5 {
			minor >= 13
		} else {
			false
		}
	};

	let host = TestHost::new().await;

	assert_eq!(
		host.security_status().await,
		SecurityStatus {
			// Disabled in tests to not enforce the presence of security features. This CI-only test
			// is the only one that tests them.
			secure_validator_mode: false,
			can_enable_landlock,
			can_enable_seccomp: true,
			can_unshare_user_namespace_and_change_root: true,
			can_do_secure_clone: true,

// Regression test to make sure the unshare-pivot-root capability does not depend on the PVF
// artifacts cache existing.
#[cfg(all(feature = "ci-only-tests", target_os = "linux"))]
#[tokio::test]
async fn nonexistent_cache_dir() {
	let host = TestHost::new_with_config(|cfg| {
		cfg.cache_path = cfg.cache_path.join("nonexistent_cache_dir");
	})
	.await;

	assert!(host.security_status().await.can_unshare_user_namespace_and_change_root);

	let _stats = host
		.precheck_pvf(::adder::wasm_binary_unwrap(), Default::default())
		.await
		.unwrap();
}

// Checks the the artifact is not re-prepared when the executor environment parameters change
// in a way not affecting the preparation
#[tokio::test]
async fn artifact_does_not_reprepare_on_non_meaningful_exec_parameter_change() {
	let host = TestHost::new_with_config(|cfg| {
		cfg.prepare_workers_hard_max_num = 1;
	})
	.await;
	let cache_dir = host.cache_dir.path();

	let set1 = ExecutorParams::default();
	let set2 =
		ExecutorParams::from(&[ExecutorParam::PvfExecTimeout(PvfExecKind::Backing, 2500)][..]);

	let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set1).await.unwrap();

	let md1 = {
		let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
		assert_eq!(cache_dir.len(), 2);
		let mut artifact_path = cache_dir.pop().unwrap().unwrap();
		if artifact_path.path().is_dir() {
			artifact_path = cache_dir.pop().unwrap().unwrap();
		}
		std::fs::metadata(artifact_path.path()).unwrap()
	};

	// FS times are not monotonical so we wait 2 secs here to be sure that the creation time of the
	// second attifact will be different
	tokio::time::sleep(Duration::from_secs(2)).await;

	let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set2).await.unwrap();

	let md2 = {
		let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
		assert_eq!(cache_dir.len(), 2);
		let mut artifact_path = cache_dir.pop().unwrap().unwrap();
		if artifact_path.path().is_dir() {
			artifact_path = cache_dir.pop().unwrap().unwrap();
		}
		std::fs::metadata(artifact_path.path()).unwrap()
	};

	assert_eq!(md1.created().unwrap(), md2.created().unwrap());
}

// Checks if the artifact is re-prepared if the re-preparation is needed by the nature of
// the execution environment parameters change
#[tokio::test]
async fn artifact_does_reprepare_on_meaningful_exec_parameter_change() {
	let host = TestHost::new_with_config(|cfg| {
		cfg.prepare_workers_hard_max_num = 1;
	})
	.await;
	let cache_dir = host.cache_dir.path();

	let set1 = ExecutorParams::default();
	let set2 =
		ExecutorParams::from(&[ExecutorParam::PvfPrepTimeout(PvfPrepKind::Prepare, 60000)][..]);

	let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set1).await.unwrap();
	let cache_dir_contents: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();

	assert_eq!(cache_dir_contents.len(), 2);

	let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set2).await.unwrap();
	let cache_dir_contents: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();

	assert_eq!(cache_dir_contents.len(), 3); // new artifact has been added
}