main.rs 4.33 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

use async_std::sync::Mutex;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
18
19
use parity_scale_codec::Encode as _;
use polkadot_node_core_pvf::{
20
	start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
21
22
};
use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult};
23
use std::time::Duration;
24
25
26
27
28

mod adder;
mod worker_common;

const PUPPET_EXE: &str = env!("CARGO_BIN_EXE_puppet_worker");
29
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

struct TestHost {
	_cache_dir: tempfile::TempDir,
	host: Mutex<ValidationHost>,
}

impl TestHost {
	fn new() -> Self {
		Self::new_with_config(|_| ())
	}

	fn new_with_config<F>(f: F) -> Self
	where
		F: FnOnce(&mut Config),
	{
		let cache_dir = tempfile::tempdir().unwrap();
		let program_path = std::path::PathBuf::from(PUPPET_EXE);
		let mut config = Config::new(cache_dir.path().to_owned(), program_path);
		f(&mut config);
49
		let (host, task) = start(config, Metrics::default());
50
		let _ = async_std::task::spawn(task);
Shawn Tabrizi's avatar
Shawn Tabrizi committed
51
		Self { _cache_dir: cache_dir, host: Mutex::new(host) }
52
53
54
55
56
57
58
59
	}

	async fn validate_candidate(
		&self,
		code: &[u8],
		params: ValidationParams,
	) -> Result<ValidationResult, ValidationError> {
		let (result_tx, result_rx) = futures::channel::oneshot::channel();
60

Shawn Tabrizi's avatar
Shawn Tabrizi committed
61
62
		let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
			.expect("Compression works");
63

64
65
66
67
		self.host
			.lock()
			.await
			.execute_pvf(
68
				Pvf::from_code(code.into()),
69
				TEST_EXECUTION_TIMEOUT,
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
				params.encode(),
				polkadot_node_core_pvf::Priority::Normal,
				result_tx,
			)
			.await
			.unwrap();
		result_rx.await.unwrap()
	}
}

#[async_std::test]
async fn terminates_on_timeout() {
	let host = TestHost::new();

	let result = host
		.validate_candidate(
			halt::wasm_binary_unwrap(),
			ValidationParams {
				block_data: BlockData(Vec::new()),
				parent_head: Default::default(),
				relay_parent_number: 1,
				relay_parent_storage_root: Default::default(),
			},
		)
		.await;

	match result {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
97
		Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)) => {},
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
		r => panic!("{:?}", r),
	}
}

#[async_std::test]
async fn parallel_execution() {
	let host = TestHost::new();
	let execute_pvf_future_1 = host.validate_candidate(
		halt::wasm_binary_unwrap(),
		ValidationParams {
			block_data: BlockData(Vec::new()),
			parent_head: Default::default(),
			relay_parent_number: 1,
			relay_parent_storage_root: Default::default(),
		},
	);
	let execute_pvf_future_2 = host.validate_candidate(
		halt::wasm_binary_unwrap(),
		ValidationParams {
			block_data: BlockData(Vec::new()),
			parent_head: Default::default(),
			relay_parent_number: 1,
			relay_parent_storage_root: Default::default(),
		},
	);

	let start = std::time::Instant::now();
	let (_, _) = futures::join!(execute_pvf_future_1, execute_pvf_future_2);

	// total time should be < 2 x EXECUTION_TIMEOUT_SEC
	const EXECUTION_TIMEOUT_SEC: u64 = 3;
	assert!(
Shawn Tabrizi's avatar
Shawn Tabrizi committed
130
131
		std::time::Instant::now().duration_since(start) <
			std::time::Duration::from_secs(EXECUTION_TIMEOUT_SEC * 2)
132
133
134
135
136
137
	);
}

#[async_std::test]
async fn execute_queue_doesnt_stall_if_workers_died() {
	let host = TestHost::new_with_config(|cfg| {
138
		cfg.execute_workers_max_num = 5;
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
	});

	// Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The
	// first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of
	// workers should be spun up.
	futures::future::join_all((0u8..=8).map(|_| {
		host.validate_candidate(
			halt::wasm_binary_unwrap(),
			ValidationParams {
				block_data: BlockData(Vec::new()),
				parent_head: Default::default(),
				relay_parent_number: 1,
				relay_parent_storage_root: Default::default(),
			},
		)
	}))
	.await;
}