Commit e99340b5 authored by Arkadiy Paronyan's avatar Arkadiy Paronyan Committed by Bastian Köcher
Browse files

Remote execution with additional logging (#767)

parent 468585c2
Pipeline #74697 passed with stages
in 18 minutes and 53 seconds
...@@ -36,6 +36,10 @@ const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; ...@@ -36,6 +36,10 @@ const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
const NUM_HOSTS: usize = 8; const NUM_HOSTS: usize = 8;
/// Execution timeout in seconds; /// Execution timeout in seconds;
#[cfg(debug_assertions)]
pub const EXECUTION_TIMEOUT_SEC: u64 = 30;
#[cfg(not(debug_assertions))]
pub const EXECUTION_TIMEOUT_SEC: u64 = 5; pub const EXECUTION_TIMEOUT_SEC: u64 = 5;
#[derive(Default)] #[derive(Default)]
...@@ -85,7 +89,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> { ...@@ -85,7 +89,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
let mut memory = match SharedMem::open(mem_id) { let mut memory = match SharedMem::open(mem_id) {
Ok(memory) => memory, Ok(memory) => memory,
Err(e) => { Err(e) => {
debug!("Error opening shared memory: {:?}", e); debug!("{} Error opening shared memory: {:?}", process::id(), e);
return Err(format!("Error opening shared memory: {:?}", e)); return Err(format!("Error opening shared memory: {:?}", e));
} }
}; };
...@@ -98,32 +102,32 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> { ...@@ -98,32 +102,32 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
std::thread::spawn(move || { std::thread::spawn(move || {
use std::io::Read; use std::io::Read;
let mut in_data = Vec::new(); let mut in_data = Vec::new();
// pipe terminates when parent process exits // pipe terminates when parent process exits
std::io::stdin().read_to_end(&mut in_data).ok(); std::io::stdin().read_to_end(&mut in_data).ok();
debug!("Parent process is dead. Exiting"); debug!("{} Parent process is dead. Exiting", process::id());
exit.store(true, atomic::Ordering::Relaxed); exit.store(true, atomic::Ordering::Relaxed);
}); });
memory.set(Event::WorkerReady as usize, EventState::Signaled) memory.set(Event::WorkerReady as usize, EventState::Signaled)
.map_err(|e| format!("Error setting shared event: {:?}", e))?; .map_err(|e| format!("{} Error setting shared event: {:?}", process::id(), e))?;
loop { loop {
if watch_exit.load(atomic::Ordering::Relaxed) { if watch_exit.load(atomic::Ordering::Relaxed) {
break; break;
} }
debug!("Waiting for candidate"); debug!("{} Waiting for candidate", process::id());
match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(1)) { match memory.wait(Event::CandidateReady as usize, shared_memory::Timeout::Sec(3)) {
Err(e) => { Err(e) => {
// Timeout // Timeout
trace!("Timeout waiting for candidate: {:?}", e); trace!("{} Timeout waiting for candidate: {:?}", process::id(), e);
continue; continue;
} }
Ok(()) => {} Ok(()) => {}
} }
{ {
debug!("Processing candidate"); debug!("{} Processing candidate", process::id());
// we have candidate data // we have candidate data
let mut slice = memory.wlock_as_slice(0) let mut slice = memory.wlock_as_slice(0)
.map_err(|e| format!("Error locking shared memory: {:?}", e))?; .map_err(|e| format!("Error locking shared memory: {:?}", e))?;
...@@ -134,7 +138,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> { ...@@ -134,7 +138,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
let mut header_buf: &[u8] = header_buf; let mut header_buf: &[u8] = header_buf;
let header = ValidationHeader::decode(&mut header_buf) let header = ValidationHeader::decode(&mut header_buf)
.map_err(|_| format!("Error decoding validation request."))?; .map_err(|_| format!("Error decoding validation request."))?;
debug!("Candidate header: {:?}", header); debug!("{} Candidate header: {:?}", process::id(), header);
let (code, rest) = rest.split_at_mut(MAX_CODE_MEM); let (code, rest) = rest.split_at_mut(MAX_CODE_MEM);
let (code, _) = code.split_at_mut(header.code_size as usize); let (code, _) = code.split_at_mut(header.code_size as usize);
let (call_data, rest) = rest.split_at_mut(MAX_RUNTIME_MEM); let (call_data, rest) = rest.split_at_mut(MAX_RUNTIME_MEM);
...@@ -142,7 +146,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> { ...@@ -142,7 +146,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
let message_data = rest; let message_data = rest;
let result = validate_candidate_internal(code, call_data, worker_ext.clone()); let result = validate_candidate_internal(code, call_data, worker_ext.clone());
debug!("Candidate validated: {:?}", result); debug!("{} Candidate validated: {:?}", process::id(), result);
match result { match result {
Ok(r) => { Ok(r) => {
...@@ -168,7 +172,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> { ...@@ -168,7 +172,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
let mut data: &mut[u8] = &mut **slice; let mut data: &mut[u8] = &mut **slice;
result.encode_to(&mut data); result.encode_to(&mut data);
} }
debug!("Signaling result"); debug!("{} Signaling result", process::id());
memory.set(Event::ResultReady as usize, EventState::Signaled) memory.set(Event::ResultReady as usize, EventState::Signaled)
.map_err(|e| format!("Error setting shared event: {:?}", e))?; .map_err(|e| format!("Error setting shared event: {:?}", e))?;
} }
...@@ -194,6 +198,7 @@ unsafe impl Send for ValidationHost {} ...@@ -194,6 +198,7 @@ unsafe impl Send for ValidationHost {}
struct ValidationHost { struct ValidationHost {
worker: Option<process::Child>, worker: Option<process::Child>,
memory: Option<SharedMem>, memory: Option<SharedMem>,
id: u32,
} }
/// Validate a candidate under the given validation code. /// Validate a candidate under the given validation code.
...@@ -253,6 +258,7 @@ impl ValidationHost { ...@@ -253,6 +258,7 @@ impl ValidationHost {
.args(args) .args(args)
.stdin(process::Stdio::piped()) .stdin(process::Stdio::piped())
.spawn()?; .spawn()?;
self.id = worker.id();
self.worker = Some(worker); self.worker = Some(worker);
memory.wait( memory.wait(
...@@ -302,11 +308,11 @@ impl ValidationHost { ...@@ -302,11 +308,11 @@ impl ValidationHost {
header.encode_to(&mut header_buf); header.encode_to(&mut header_buf);
} }
debug!("Signaling candidate"); debug!("{} Signaling candidate", self.id);
memory.set(Event::CandidateReady as usize, EventState::Signaled)?; memory.set(Event::CandidateReady as usize, EventState::Signaled)?;
debug!("Waiting for results"); debug!("{} Waiting for results", self.id);
match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(5)) { match memory.wait(Event::ResultReady as usize, shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize)) {
Err(e) => { Err(e) => {
debug!("Worker timeout: {:?}", e); debug!("Worker timeout: {:?}", e);
if let Some(mut worker) = self.worker.take() { if let Some(mut worker) = self.worker.take() {
...@@ -318,6 +324,7 @@ impl ValidationHost { ...@@ -318,6 +324,7 @@ impl ValidationHost {
} }
{ {
debug!("{} Reading results", self.id);
let data: &[u8] = &**memory.wlock_as_slice(0)?; let data: &[u8] = &**memory.wlock_as_slice(0)?;
let (header_buf, rest) = data.split_at(1024); let (header_buf, rest) = data.split_at(1024);
let (_, rest) = rest.split_at(MAX_CODE_MEM); let (_, rest) = rest.split_at(MAX_CODE_MEM);
...@@ -346,6 +353,7 @@ impl ValidationHost { ...@@ -346,6 +353,7 @@ impl ValidationHost {
Ok(result) Ok(result)
} }
ValidationResultHeader::Error(message) => { ValidationResultHeader::Error(message) => {
debug!("{} Validation error: {}", self.id, message);
Err(Error::External(message).into()) Err(Error::External(message).into())
} }
} }
......
...@@ -461,7 +461,7 @@ fn do_validation<P>( ...@@ -461,7 +461,7 @@ fn do_validation<P>(
&validation_code, &validation_code,
params, params,
ext.clone(), ext.clone(),
ExecutionMode::Local, ExecutionMode::Remote,
) { ) {
Ok(result) => { Ok(result) => {
if result.head_data == head_data.0 { if result.head_data == head_data.0 {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment