Unverified Commit 3ffb048f authored by Sergey Pepyakin's avatar Sergey Pepyakin Committed by GitHub
Browse files

Put WIP artifacts next to ready ones (#3057)



* Put WIP artifacts next to ready ones

Fixes #3044

* Apply suggestions from code review

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
parent 9a49dd26
Pipeline #138906 failed with stages
in 32 minutes and 40 seconds
......@@ -155,7 +155,8 @@ pub fn start(config: Config) -> (ValidationHost, impl Future<Output = ()>) {
let validation_host = ValidationHost { to_host_tx };
let (to_prepare_pool, from_prepare_pool, run_prepare_pool) = prepare::start_pool(
config.prepare_worker_program_path.to_owned(),
config.prepare_worker_program_path.clone(),
config.cache_path.clone(),
config.prepare_worker_spawn_timeout,
);
......
......@@ -104,6 +104,7 @@ type Mux = FuturesUnordered<BoxFuture<'static, PoolEvent>>;
struct Pool {
program_path: PathBuf,
cache_path: PathBuf,
spawn_timeout: Duration,
to_pool: mpsc::Receiver<ToPool>,
from_pool: mpsc::UnboundedSender<FromPool>,
......@@ -117,6 +118,7 @@ struct Fatal;
async fn run(
Pool {
program_path,
cache_path,
spawn_timeout,
to_pool,
mut from_pool,
......@@ -141,6 +143,7 @@ async fn run(
let to_pool = break_if_fatal!(to_pool.ok_or(Fatal));
handle_to_pool(
&program_path,
&cache_path,
spawn_timeout,
&mut spawned,
&mut mux,
......@@ -181,6 +184,7 @@ async fn purge_dead(
fn handle_to_pool(
program_path: &Path,
cache_path: &Path,
spawn_timeout: Duration,
spawned: &mut HopSlotMap<Worker, WorkerData>,
mux: &mut Mux,
......@@ -199,7 +203,14 @@ fn handle_to_pool(
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
mux.push(
start_work_task(worker, idle, code, artifact_path, background_priority)
start_work_task(
worker,
idle,
code,
cache_path.to_owned(),
artifact_path,
background_priority
)
.boxed(),
);
} else {
......@@ -251,10 +262,12 @@ async fn start_work_task(
worker: Worker,
idle: IdleWorker,
code: Arc<Vec<u8>>,
cache_path: PathBuf,
artifact_path: PathBuf,
background_priority: bool,
) -> PoolEvent {
let outcome = worker::start_work(idle, code, artifact_path, background_priority).await;
let outcome =
worker::start_work(idle, code, &cache_path, artifact_path, background_priority).await;
PoolEvent::StartWork(worker, outcome)
}
......@@ -314,6 +327,7 @@ fn reply(from_pool: &mut mpsc::UnboundedSender<FromPool>, m: FromPool) -> Result
/// Spins up the pool and returns the future that should be polled to make the pool functional.
pub fn start(
program_path: PathBuf,
cache_path: PathBuf,
spawn_timeout: Duration,
) -> (
mpsc::Sender<ToPool>,
......@@ -325,6 +339,7 @@ pub fn start(
let run = run(Pool {
program_path,
cache_path,
spawn_timeout,
to_pool: to_pool_rx,
from_pool: from_pool_tx,
......
......@@ -19,7 +19,7 @@ use crate::{
artifacts::Artifact,
worker_common::{
IdleWorker, SpawnErr, WorkerHandle, bytes_to_path, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, tmpfile, worker_event_loop,
spawn_with_program_path, tmpfile_in, worker_event_loop,
},
};
use async_std::{
......@@ -70,6 +70,7 @@ pub enum Outcome {
pub async fn start_work(
worker: IdleWorker,
code: Arc<Vec<u8>>,
cache_path: &Path,
artifact_path: PathBuf,
background_priority: bool,
) -> Outcome {
......@@ -87,8 +88,14 @@ pub async fn start_work(
renice(pid, NICENESS_BACKGROUND);
}
if let Err(err) = send_request(&mut stream, code).await {
tracing::warn!("failed to send a prepare request to pid={}: {:?}", pid, err);
with_tmp_file(pid, cache_path, |tmp_file| async move {
if let Err(err) = send_request(&mut stream, code, &tmp_file).await {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to send a prepare request: {:?}",
err,
);
return Outcome::DidntMakeIt;
}
......@@ -106,19 +113,53 @@ pub async fn start_work(
}
let selected = futures::select! {
artifact_path_bytes = framed_recv(&mut stream).fuse() => {
match artifact_path_bytes {
Ok(bytes) => {
if let Some(tmp_path) = bytes_to_path(&bytes) {
async_std::fs::rename(tmp_path, &artifact_path)
res = framed_recv(&mut stream).fuse() => {
match res {
Ok(x) if x == &[1u8] => {
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"promoting WIP artifact {} to {}",
tmp_file.display(),
artifact_path.display(),
);
async_std::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Selected::Done)
.unwrap_or(Selected::IoErr)
} else {
.unwrap_or_else(|err| {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to rename the artifact from {} to {}: {:?}",
tmp_file.display(),
artifact_path.display(),
err,
);
Selected::IoErr
})
}
Ok(response_bytes) => {
use sp_core::hexdisplay::HexDisplay;
let bound_bytes =
&response_bytes[..response_bytes.len().min(4)];
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes),
);
Selected::IoErr
},
Err(_) => Selected::IoErr,
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to recv a prepare response: {:?}",
err,
);
Selected::IoErr
}
}
},
_ = Delay::new(COMPILATION_TIMEOUT).fuse() => Selected::Deadline,
......@@ -136,14 +177,75 @@ pub async fn start_work(
Outcome::DidntMakeIt
}
}
})
.await
}
async fn send_request(stream: &mut UnixStream, code: Arc<Vec<u8>>) -> io::Result<()> {
framed_send(stream, &*code).await
/// Create a temporary file for an artifact at the given cache path and execute the given
/// future/closure passing the file path in.
///
/// The function will try best effort to not leave behind the temporary file.
async fn with_tmp_file<F, Fut>(pid: u32, cache_path: &Path, f: F) -> Outcome
where
Fut: futures::Future<Output = Outcome>,
F: FnOnce(PathBuf) -> Fut,
{
let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await {
Ok(f) => f,
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to create a temp file for the artifact: {:?}",
err,
);
return Outcome::DidntMakeIt;
}
};
let outcome = f(tmp_file.clone()).await;
// The function called above is expected to move `tmp_file` to a new location upon success. However,
// the function may as well fail and in that case we should remove the tmp file here.
//
// In any case, we try to remove the file here so that there are no leftovers. We only report
// errors that are different from the `NotFound`.
match async_std::fs::remove_file(tmp_file).await {
Ok(()) => (),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to remove the tmp file: {:?}",
err,
);
}
}
outcome
}
async fn recv_request(stream: &mut UnixStream) -> io::Result<Vec<u8>> {
framed_recv(stream).await
async fn send_request(
stream: &mut UnixStream,
code: Arc<Vec<u8>>,
tmp_file: &Path,
) -> io::Result<()> {
framed_send(stream, &*code).await?;
framed_send(stream, path_to_bytes(tmp_file)).await?;
Ok(())
}
async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)> {
let code = framed_recv(stream).await?;
let tmp_file = framed_recv(stream).await?;
let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"prepare pvf recv_request: non utf-8 artifact path".to_string(),
)
})?;
Ok((code, tmp_file))
}
pub fn bump_priority(handle: &WorkerHandle) {
......@@ -173,7 +275,7 @@ fn renice(pid: u32, niceness: i32) {
pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("prepare", socket_path, |mut stream| async move {
loop {
let code = recv_request(&mut stream).await?;
let (code, dest) = recv_request(&mut stream).await?;
tracing::debug!(
target: LOG_TARGET,
......@@ -183,7 +285,6 @@ pub fn worker_entrypoint(socket_path: &str) {
let artifact_bytes = prepare_artifact(&code).serialize();
// Write the serialized artifact into into a temp file.
let dest = tmpfile("prepare-artifact-").await?;
tracing::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
......@@ -192,8 +293,8 @@ pub fn worker_entrypoint(socket_path: &str) {
);
async_std::fs::write(&dest, &artifact_bytes).await?;
// Communicate the results back to the host.
framed_send(&mut stream, &path_to_bytes(&dest)).await?;
// Return back a byte that signals finishing the work.
framed_send(&mut stream, &[1u8]).await?;
}
});
}
......
......@@ -85,13 +85,12 @@ where
result
}
/// Returns a path under the location for temporary files. The file name will start with the given
/// prefix.
/// Returns a path under the given `dir`. The file name will start with the given prefix.
///
/// There is only a certain number of retries. If exceeded this function will give up and return an
/// error.
pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
fn tmppath(prefix: &str) -> PathBuf {
pub async fn tmpfile_in(prefix: &str, dir: &Path) -> io::Result<PathBuf> {
fn tmppath(prefix: &str, dir: &Path) -> PathBuf {
use rand::distributions::Alphanumeric;
const DESCRIMINATOR_LEN: usize = 10;
......@@ -107,15 +106,15 @@ pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
let s = std::str::from_utf8(&buf)
.expect("the string is collected from a valid utf-8 sequence; qed");
let mut temp_dir = PathBuf::from(std::env::temp_dir());
temp_dir.push(s);
temp_dir
let mut file = dir.to_owned();
file.push(s);
file
}
const NUM_RETRIES: usize = 50;
for _ in 0..NUM_RETRIES {
let candidate_path = tmppath(prefix);
let candidate_path = tmppath(prefix, dir);
if !candidate_path.exists().await {
return Ok(candidate_path)
}
......@@ -126,6 +125,12 @@ pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
)
}
/// The same as [`tmpfile_in`], but uses [`std::env::temp_dir`] as the directory.
pub async fn tmpfile(prefix: &str) -> io::Result<PathBuf> {
let temp_dir = PathBuf::from(std::env::temp_dir());
tmpfile_in(prefix, &temp_dir).await
}
pub fn worker_event_loop<F, Fut>(debug_id: &'static str, socket_path: &str, mut event_loop: F)
where
F: FnMut(UnixStream) -> Fut,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment