// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! A queue that handles requests for PVF execution.
use super::worker_intf::Outcome;
use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
metrics::Metrics,
worker_intf::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET,
};
use futures::{
channel::mpsc,
future::BoxFuture,
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
use polkadot_primitives::{ExecutorParams, ExecutorParamsHash};
use slotmap::HopSlotMap;
use std::{
collections::VecDeque,
fmt,
path::PathBuf,
time::{Duration, Instant},
};
/// The amount of time a job for which the queue does not have a compatible worker may wait in the
/// queue. After that time passes, the queue will kill the first worker which becomes idle to
/// re-spawn a new worker to execute the job immediately.
/// To make any sense and not to break things, the value should be greater than minimal execution
/// timeout in use, and less than the block time.
const MAX_KEEP_WAITING: Duration = Duration::from_secs(4);
slotmap::new_key_type! { struct Worker; }
#[derive(Debug)]
pub enum ToQueue {
Enqueue { artifact: ArtifactPathId, pending_execution_request: PendingExecutionRequest },
}
/// An execution request that should execute the PVF (known in the context) and send the results
/// to the given result sender.
#[derive(Debug)]
pub struct PendingExecutionRequest {
pub exec_timeout: Duration,
pub params: Vec,
pub executor_params: ExecutorParams,
pub result_tx: ResultSender,
}
struct ExecuteJob {
artifact: ArtifactPathId,
exec_timeout: Duration,
params: Vec,
executor_params: ExecutorParams,
result_tx: ResultSender,
waiting_since: Instant,
}
struct WorkerData {
idle: Option,
handle: WorkerHandle,
executor_params_hash: ExecutorParamsHash,
}
impl fmt::Debug for WorkerData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WorkerData(pid={})", self.handle.id())
}
}
struct Workers {
/// The registry of running workers.
running: HopSlotMap,
/// The number of spawning but not yet spawned workers.
spawn_inflight: usize,
/// The maximum number of workers queue can have at once.
capacity: usize,
}
impl Workers {
fn can_afford_one_more(&self) -> bool {
self.spawn_inflight + self.running.len() < self.capacity
}
fn find_available(&self, executor_params_hash: ExecutorParamsHash) -> Option {
self.running.iter().find_map(|d| {
if d.1.idle.is_some() && d.1.executor_params_hash == executor_params_hash {
Some(d.0)
} else {
None
}
})
}
fn find_idle(&self) -> Option {
self.running
.iter()
.find_map(|d| if d.1.idle.is_some() { Some(d.0) } else { None })
}
/// Find the associated data by the worker token and extract it's [`IdleWorker`] token.
///
/// Returns `None` if either worker is not recognized or idle token is absent.
fn claim_idle(&mut self, worker: Worker) -> Option {
self.running.get_mut(worker)?.idle.take()
}
}
enum QueueEvent {
Spawn(IdleWorker, WorkerHandle, ExecuteJob),
StartWork(Worker, Outcome, ArtifactId, ResultSender),
}
type Mux = FuturesUnordered>;
struct Queue {
metrics: Metrics,
/// The receiver that receives messages to the pool.
to_queue_rx: mpsc::Receiver,
program_path: PathBuf,
spawn_timeout: Duration,
/// The queue of jobs that are waiting for a worker to pick up.
queue: VecDeque,
workers: Workers,
mux: Mux,
}
impl Queue {
fn new(
metrics: Metrics,
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
to_queue_rx: mpsc::Receiver,
) -> Self {
Self {
metrics,
program_path,
spawn_timeout,
to_queue_rx,
queue: VecDeque::new(),
mux: Mux::new(),
workers: Workers {
running: HopSlotMap::with_capacity_and_key(10),
spawn_inflight: 0,
capacity: worker_capacity,
},
}
}
async fn run(mut self) {
loop {
futures::select! {
to_queue = self.to_queue_rx.next() => {
if let Some(to_queue) = to_queue {
handle_to_queue(&mut self, to_queue);
} else {
break;
}
}
ev = self.mux.select_next_some() => handle_mux(&mut self, ev).await,
}
purge_dead(&self.metrics, &mut self.workers).await;
}
}
/// Tries to assign a job in the queue to a worker. If an idle worker is provided, it does its
/// best to find a job with a compatible execution environment unless there are jobs in the
/// queue waiting too long. In that case, it kills an existing idle worker and spawns a new
/// one. It may spawn an additional worker if that is affordable.
/// If all the workers are busy or the queue is empty, it does nothing.
/// Should be called every time a new job arrives to the queue or a job finishes.
fn try_assign_next_job(&mut self, finished_worker: Option) {
// New jobs are always pushed to the tail of the queue; the one at its head is always
// the eldest one.
let eldest = if let Some(eldest) = self.queue.get(0) { eldest } else { return };
// By default, we're going to execute the eldest job on any worker slot available, even if
// we have to kill and re-spawn a worker
let mut worker = None;
let mut job_index = 0;
// But if we're not pressed for time, we can try to find a better job-worker pair not
// requiring the expensive kill-spawn operation
if eldest.waiting_since.elapsed() < MAX_KEEP_WAITING {
if let Some(finished_worker) = finished_worker {
if let Some(worker_data) = self.workers.running.get(finished_worker) {
for (i, job) in self.queue.iter().enumerate() {
if worker_data.executor_params_hash == job.executor_params.hash() {
(worker, job_index) = (Some(finished_worker), i);
break
}
}
}
}
}
if worker.is_none() {
// Try to obtain a worker for the job
worker = self.workers.find_available(self.queue[job_index].executor_params.hash());
}
if worker.is_none() {
if let Some(idle) = self.workers.find_idle() {
// No available workers of required type but there are some idle ones of other
// types, have to kill one and re-spawn with the correct type
if self.workers.running.remove(idle).is_some() {
self.metrics.execute_worker().on_retired();
}
}
}
if worker.is_none() && !self.workers.can_afford_one_more() {
// Bad luck, no worker slot can be used to execute the job
return
}
let job = self.queue.remove(job_index).expect("Job is just checked to be in queue; qed");
if let Some(worker) = worker {
assign(self, worker, job);
} else {
spawn_extra_worker(self, job);
}
}
}
async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
let mut to_remove = vec![];
for (worker, data) in workers.running.iter_mut() {
if futures::poll!(&mut data.handle).is_ready() {
// a resolved future means that the worker has terminated. Weed it out.
to_remove.push(worker);
}
}
for w in to_remove {
if workers.running.remove(w).is_some() {
metrics.execute_worker().on_retired();
}
}
}
fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue;
let PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } =
pending_execution_request;
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
queue.metrics.execute_enqueued();
let job = ExecuteJob {
artifact,
exec_timeout,
params,
executor_params,
result_tx,
waiting_since: Instant::now(),
};
queue.queue.push_back(job);
queue.try_assign_next_job(None);
}
async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
match event {
QueueEvent::Spawn(idle, handle, job) => {
handle_worker_spawned(queue, idle, handle, job);
},
QueueEvent::StartWork(worker, outcome, artifact_id, result_tx) => {
handle_job_finish(queue, worker, outcome, artifact_id, result_tx);
},
}
}
fn handle_worker_spawned(
queue: &mut Queue,
idle: IdleWorker,
handle: WorkerHandle,
job: ExecuteJob,
) {
queue.metrics.execute_worker().on_spawned();
queue.workers.spawn_inflight -= 1;
let worker = queue.workers.running.insert(WorkerData {
idle: Some(idle),
handle,
executor_params_hash: job.executor_params.hash(),
});
gum::debug!(target: LOG_TARGET, ?worker, "execute worker spawned");
assign(queue, worker, job);
}
/// If there are pending jobs in the queue, schedules the next of them onto the just freed up
/// worker. Otherwise, puts back into the available workers list.
fn handle_job_finish(
queue: &mut Queue,
worker: Worker,
outcome: Outcome,
artifact_id: ArtifactId,
result_tx: ResultSender,
) {
let (idle_worker, result, duration) = match outcome {
Outcome::Ok { result_descriptor, duration, idle_worker } => {
// TODO: propagate the soft timeout
(Some(idle_worker), Ok(result_descriptor), Some(duration))
},
Outcome::InvalidCandidate { err, idle_worker } => (
Some(idle_worker),
Err(ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(err))),
None,
),
Outcome::InternalError { err } => (None, Err(ValidationError::InternalError(err)), None),
Outcome::HardTimeout =>
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::HardTimeout)), None),
// "Maybe invalid" errors (will retry).
Outcome::IoErr => (
None,
Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbiguousWorkerDeath)),
None,
),
Outcome::Panic { err } =>
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::Panic(err))), None),
};
queue.metrics.execute_finished();
if let Err(ref err) = result {
gum::warn!(
target: LOG_TARGET,
?artifact_id,
?worker,
worker_rip = idle_worker.is_none(),
"execution worker concluded, error occurred: {:?}",
err
);
} else {
gum::trace!(
target: LOG_TARGET,
?artifact_id,
?worker,
worker_rip = idle_worker.is_none(),
?duration,
"execute worker concluded successfully",
);
}
// First we send the result. It may fail due to the other end of the channel being dropped,
// that's legitimate and we don't treat that as an error.
let _ = result_tx.send(result);
// Then, we should deal with the worker:
//
// - if the `idle_worker` token was returned we should either schedule the next task or just put
// it back so that the next incoming job will be able to claim it
//
// - if the `idle_worker` token was consumed, all the metadata pertaining to that worker should
// be removed.
if let Some(idle_worker) = idle_worker {
if let Some(data) = queue.workers.running.get_mut(worker) {
data.idle = Some(idle_worker);
return queue.try_assign_next_job(Some(worker))
}
} else {
// Note it's possible that the worker was purged already by `purge_dead`
if queue.workers.running.remove(worker).is_some() {
queue.metrics.execute_worker().on_retired();
}
}
queue.try_assign_next_job(None);
}
fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) {
queue.metrics.execute_worker().on_begin_spawn();
gum::debug!(target: LOG_TARGET, "spawning an extra worker");
queue
.mux
.push(spawn_worker_task(queue.program_path.clone(), job, queue.spawn_timeout).boxed());
queue.workers.spawn_inflight += 1;
}
/// Spawns a new worker to execute a pre-assigned job.
/// A worker is never spawned as idle; a job to be executed by the worker has to be determined
/// beforehand. In such a way, a race condition is avoided: during the worker being spawned,
/// another job in the queue, with an incompatible execution environment, may become stale, and
/// the queue would have to kill a newly started worker and spawn another one.
/// Nevertheless, if the worker finishes executing the job, it becomes idle and may be used to execute other jobs with a compatible execution environment.
async fn spawn_worker_task(
program_path: PathBuf,
job: ExecuteJob,
spawn_timeout: Duration,
) -> QueueEvent {
use futures_timer::Delay;
loop {
match super::worker_intf::spawn(&program_path, job.executor_params.clone(), spawn_timeout)
.await
{
Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job),
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);
// Assume that the failure is intermittent and retry after a delay.
Delay::new(Duration::from_secs(3)).await;
},
}
}
}
/// Ask the given worker to perform the given job.
///
/// The worker must be running and idle. The job and the worker must share the same execution
/// environment parameter set.
fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?job.artifact.id,
?worker,
"assigning the execute worker",
);
debug_assert_eq!(
queue
.workers
.running
.get(worker)
.expect("caller must provide existing worker; qed")
.executor_params_hash,
job.executor_params.hash()
);
let idle = queue.workers.claim_idle(worker).expect(
"this caller must supply a worker which is idle and running;
thus claim_idle cannot return None;
qed.",
);
let execution_timer = queue.metrics.time_execution();
queue.mux.push(
async move {
let _timer = execution_timer;
let outcome = super::worker_intf::start_work(
idle,
job.artifact.clone(),
job.exec_timeout,
job.params,
)
.await;
QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
}
.boxed(),
);
}
pub fn start(
metrics: Metrics,
program_path: PathBuf,
worker_capacity: usize,
spawn_timeout: Duration,
) -> (mpsc::Sender, impl Future