validation_host.rs 9.4 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
#![cfg(not(any(target_os = "android", target_os = "unknown")))]
18

19
20
21
22
use std::{process, env, sync::Arc, sync::atomic};
use codec::{Decode, Encode};
use crate::primitives::{ValidationParams, ValidationResult};
use super::{validate_candidate_internal, Error};
23
24
25
26
27
28
29
30
31
32
use super::{MAX_CODE_MEM, MAX_RUNTIME_MEM};
use shared_memory::{SharedMem, SharedMemConf, EventState, WriteLockable, EventWait, EventSet};
use parking_lot::Mutex;
use log::{debug, trace};

const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
/// CLI Argument to start in validation worker mode.
const WORKER_ARG: &'static str = "validation-worker";
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];

33
/// Execution timeout in seconds;
34
35
36
37
#[cfg(debug_assertions)]
pub const EXECUTION_TIMEOUT_SEC: u64 =  30;

#[cfg(not(debug_assertions))]
38
39
pub const EXECUTION_TIMEOUT_SEC: u64 =  5;

40
41
42
43
44
45
enum Event {
	CandidateReady = 0,
	ResultReady = 1,
	WorkerReady = 2,
}

46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/// A pool of hosts.
#[derive(Clone)]
pub struct ValidationPool {
	hosts: Arc<Vec<Mutex<ValidationHost>>>,
}

const DEFAULT_NUM_HOSTS: usize = 8;

impl ValidationPool {
	/// Creates a validation pool with the default configuration.
	pub fn new() -> ValidationPool {
		ValidationPool {
			hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()),
		}
	}

	/// Validate a candidate under the given validation code using the next
	/// free validation host.
	///
	/// This will fail if the validation code is not a proper parachain validation module.
66
	pub fn validate_candidate(
67
68
69
70
71
72
73
		&self,
		validation_code: &[u8],
		params: ValidationParams,
		test_mode: bool,
	) -> Result<ValidationResult, Error> {
		for host in self.hosts.iter() {
			if let Some(mut host) = host.try_lock() {
74
				return host.validate_candidate(validation_code, params, test_mode);
75
76
77
78
			}
		}

		// all workers are busy, just wait for the first one
79
		self.hosts[0].lock().validate_candidate(validation_code, params, test_mode)
80
	}
81
82
}

郭光华's avatar
郭光华 committed
83
/// Validation worker process entry point. Runs a loop waiting for candidates to validate
84
85
86
87
88
/// and sends back results via shared memory.
pub fn run_worker(mem_id: &str) -> Result<(), String> {
	let mut memory = match SharedMem::open(mem_id) {
		Ok(memory) => memory,
		Err(e) => {
89
			debug!("{} Error opening shared memory: {:?}", process::id(), e);
90
91
92
			return Err(format!("Error opening shared memory: {:?}", e));
		}
	};
93

94
95
96
97
98
99
	let exit = Arc::new(atomic::AtomicBool::new(false));
	// spawn parent monitor thread
	let watch_exit = exit.clone();
	std::thread::spawn(move || {
		use std::io::Read;
		let mut in_data = Vec::new();
100
		// pipe terminates when parent process exits
101
		std::io::stdin().read_to_end(&mut in_data).ok();
102
		debug!("{} Parent process is dead. Exiting", process::id());
103
104
105
106
		exit.store(true, atomic::Ordering::Relaxed);
	});

	memory.set(Event::WorkerReady as usize, EventState::Signaled)
107
		.map_err(|e| format!("{} Error setting shared event: {:?}", process::id(), e))?;
108
109
110
111
112
113

	loop {
		if watch_exit.load(atomic::Ordering::Relaxed) {
			break;
		}

114
115
		debug!("{} Waiting for candidate", process::id());
		match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(3)) {
116
117
			Err(e) => {
				// Timeout
118
				trace!("{} Timeout waiting for candidate: {:?}", process::id(), e);
119
120
121
122
123
124
				continue;
			}
			Ok(()) => {}
		}

		{
125
			debug!("{} Processing candidate", process::id());
126
127
128
129
130
131
132
133
134
			// we have candidate data
			let mut slice = memory.wlock_as_slice(0)
				.map_err(|e| format!("Error locking shared memory: {:?}", e))?;

			let result = {
				let data: &mut[u8] = &mut **slice;
				let (header_buf, rest) = data.split_at_mut(1024);
				let mut header_buf: &[u8] = header_buf;
				let header = ValidationHeader::decode(&mut header_buf)
135
					.map_err(|_| format!("Error decoding validation request."))?;
136
				debug!("{} Candidate header: {:?}", process::id(), header);
137
138
				let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
				let (code, _) = code.split_at_mut(header.code_size as usize);
Ashley's avatar
Ashley committed
139
				let (call_data, _) = rest.split_at_mut(MAX_RUNTIME_MEM);
140
141
				let (call_data, _) = call_data.split_at_mut(header.params_size as usize);

142
				let result = validate_candidate_internal(code, call_data);
143
				debug!("{} Candidate validated: {:?}", process::id(), result);
144
145

				match result {
146
					Ok(r) => ValidationResultHeader::Ok(r),
147
148
149
150
151
152
					Err(e) => ValidationResultHeader::Error(e.to_string()),
				}
			};
			let mut data: &mut[u8] = &mut **slice;
			result.encode_to(&mut data);
		}
153
		debug!("{} Signaling result", process::id());
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
		memory.set(Event::ResultReady as usize, EventState::Signaled)
			.map_err(|e| format!("Error setting shared event: {:?}", e))?;
	}
	Ok(())
}

/// Params header in shared memory. All offsets should be aligned to WASM page size.
#[derive(Encode, Decode, Debug)]
struct ValidationHeader {
	code_size: u64,
	params_size: u64,
}

#[derive(Encode, Decode, Debug)]
pub enum ValidationResultHeader {
169
	Ok(ValidationResult),
170
171
172
173
174
	Error(String),
}

unsafe impl Send for ValidationHost {}

175
176
#[derive(Default)]
struct ValidationHost {
177
178
	worker: Option<process::Child>,
	memory: Option<SharedMem>,
179
	id: u32,
180
181
182
183
184
185
186
187
188
189
190
191
}

impl Drop for ValidationHost {
	fn drop(&mut self) {
		if let Some(ref mut worker) = &mut self.worker {
			worker.kill().ok();
		}
	}
}

impl ValidationHost {
	fn create_memory() -> Result<SharedMem, Error> {
192
		let mem_size = MAX_RUNTIME_MEM + MAX_CODE_MEM + 1024;
193
		let mem_config = SharedMemConf::default()
194
195
196
197
			.set_size(mem_size)
			.add_lock(shared_memory::LockType::Mutex, 0, mem_size)?
			.add_event(shared_memory::EventType::Auto)?  // Event::CandidateReady
			.add_event(shared_memory::EventType::Auto)?  // Event::ResultReady
198
			.add_event(shared_memory::EventType::Auto)?; // Event::WorkerReady
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219

		Ok(mem_config.create()?)
	}

	fn start_worker(&mut self, test_mode: bool) -> Result<(), Error> {
		if let Some(ref mut worker) = self.worker {
			// Check if still alive
			if let Ok(None) = worker.try_wait() {
				// Still running
				return Ok(());
			}
		}
		let memory = Self::create_memory()?;
		let self_path = env::current_exe()?;
		debug!("Starting worker at {:?}", self_path);
		let mut args = if test_mode { WORKER_ARGS_TEST.to_vec() } else { WORKER_ARGS.to_vec() };
		args.push(memory.get_os_path());
		let worker = process::Command::new(self_path)
			.args(args)
			.stdin(process::Stdio::piped())
			.spawn()?;
220
		self.id = worker.id();
221
222
		self.worker = Some(worker);

223
224
225
226
		memory.wait(
			Event::WorkerReady as usize,
			shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize),
		)?;
227
228
229
230
231
232
233
		self.memory = Some(memory);
		Ok(())
	}

	/// Validate a candidate under the given validation code.
	///
	/// This will fail if the validation code is not a proper parachain validation module.
234
	pub fn validate_candidate(
235
236
237
238
		&mut self,
		validation_code: &[u8],
		params: ValidationParams,
		test_mode: bool,
239
	) -> Result<ValidationResult, Error> {
240
241
242
243
244
		if validation_code.len() > MAX_CODE_MEM {
			return Err(Error::CodeTooLarge(validation_code.len()));
		}
		// First, check if need to spawn the child process
		self.start_worker(test_mode)?;
245
246
		let memory = self.memory.as_mut()
			.expect("memory is always `Some` after `start_worker` completes successfully");
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
		{
			// Put data in shared mem
			let data: &mut[u8] = &mut **memory.wlock_as_slice(0)?;
			let (mut header_buf, rest) = data.split_at_mut(1024);
			let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
			let (code, _) = code.split_at_mut(validation_code.len());
			let (call_data, _) = rest.split_at_mut(MAX_RUNTIME_MEM);
			code[..validation_code.len()].copy_from_slice(validation_code);
			let encoded_params = params.encode();
			if encoded_params.len() >= MAX_RUNTIME_MEM {
				return Err(Error::ParamsTooLarge(MAX_RUNTIME_MEM));
			}
			call_data[..encoded_params.len()].copy_from_slice(&encoded_params);

			let header = ValidationHeader {
				code_size: validation_code.len() as u64,
				params_size: encoded_params.len() as u64,
			};

			header.encode_to(&mut header_buf);
		}

269
		debug!("{} Signaling candidate", self.id);
270
271
		memory.set(Event::CandidateReady as usize, EventState::Signaled)?;

272
273
		debug!("{} Waiting for results", self.id);
		match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize)) {
274
275
276
277
278
279
280
281
282
283
284
			Err(e) => {
				debug!("Worker timeout: {:?}", e);
				if let Some(mut worker) = self.worker.take() {
					worker.kill().ok();
				}
				return Err(Error::Timeout.into());
			}
			Ok(()) => {}
		}

		{
285
			debug!("{} Reading results", self.id);
286
			let data: &[u8] = &**memory.wlock_as_slice(0)?;
287
			let (header_buf, _) = data.split_at(1024);
288
289
290
			let mut header_buf: &[u8] = header_buf;
			let header = ValidationResultHeader::decode(&mut header_buf).unwrap();
			match header {
291
				ValidationResultHeader::Ok(result) => Ok(result),
292
				ValidationResultHeader::Error(message) => {
293
					debug!("{} Validation error: {}", self.id, message);
294
295
296
297
298
299
					Err(Error::External(message).into())
				}
			}
		}
	}
}