// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! Validation host - is the primary interface for this crate. It allows the clients to enqueue
//! jobs for PVF execution or preparation.
//!
//! The validation host is represented by a future/task that runs an event-loop and by a handle,
//! [`ValidationHost`], that allows communication with that event-loop.
use crate::{
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
execute,
metrics::Metrics,
prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET,
};
use always_assert::never;
use async_std::path::{Path, PathBuf};
use futures::{
channel::{mpsc, oneshot},
Future, FutureExt, SinkExt, StreamExt,
};
use polkadot_parachain::primitives::ValidationResult;
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};
/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender>;
/// Transmission end used for sending the PVF preparation result.
pub(crate) type PrepareResultSender = oneshot::Sender;
/// A handle to the async process serving the validation host requests.
#[derive(Clone)]
pub struct ValidationHost {
to_host_tx: mpsc::Sender,
}
impl ValidationHost {
/// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time limit.
/// The result of execution will be sent to the provided result sender.
///
/// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of
/// situations this function should return immediately.
///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn precheck_pvf(
&mut self,
pvf: Pvf,
result_tx: PrepareResultSender,
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::PrecheckPvf { pvf, result_tx })
.await
.map_err(|_| "the inner loop hung up".to_string())
}
/// Execute PVF with the given code, execution timeout, parameters and priority.
/// The result of execution will be sent to the provided result sender.
///
/// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of
/// situations this function should return immediately.
///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn execute_pvf(
&mut self,
pvf: Pvf,
execution_timeout: Duration,
params: Vec,
priority: Priority,
result_tx: ResultSender,
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::ExecutePvf { pvf, execution_timeout, params, priority, result_tx })
.await
.map_err(|_| "the inner loop hung up".to_string())
}
/// Sends a signal to the validation host requesting to prepare a list of the given PVFs.
///
/// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of
/// situations this function should return immediately.
///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn heads_up(&mut self, active_pvfs: Vec) -> Result<(), String> {
self.to_host_tx
.send(ToHost::HeadsUp { active_pvfs })
.await
.map_err(|_| "the inner loop hung up".to_string())
}
}
enum ToHost {
PrecheckPvf {
pvf: Pvf,
result_tx: PrepareResultSender,
},
ExecutePvf {
pvf: Pvf,
execution_timeout: Duration,
params: Vec,
priority: Priority,
result_tx: ResultSender,
},
HeadsUp {
active_pvfs: Vec,
},
}
/// Configuration for the validation host.
pub struct Config {
/// The root directory where the prepared artifacts can be stored.
pub cache_path: PathBuf,
/// The path to the program that can be used to spawn the prepare workers.
pub prepare_worker_program_path: PathBuf,
/// The time allotted for a prepare worker to spawn and report to the host.
pub prepare_worker_spawn_timeout: Duration,
/// The maximum number of workers that can be spawned in the prepare pool for tasks with the
/// priority below critical.
pub prepare_workers_soft_max_num: usize,
/// The absolute number of workers that can be spawned in the prepare pool.
pub prepare_workers_hard_max_num: usize,
/// The path to the program that can be used to spawn the execute workers.
pub execute_worker_program_path: PathBuf,
/// The time allotted for an execute worker to spawn and report to the host.
pub execute_worker_spawn_timeout: Duration,
/// The maximum number of execute workers that can run at the same time.
pub execute_workers_max_num: usize,
}
impl Config {
/// Create a new instance of the configuration.
pub fn new(cache_path: std::path::PathBuf, program_path: std::path::PathBuf) -> Self {
// Do not contaminate the other parts of the codebase with the types from async_std.
let cache_path = PathBuf::from(cache_path);
let program_path = PathBuf::from(program_path);
Self {
cache_path,
prepare_worker_program_path: program_path.clone(),
prepare_worker_spawn_timeout: Duration::from_secs(3),
prepare_workers_soft_max_num: 1,
prepare_workers_hard_max_num: 1,
execute_worker_program_path: program_path,
execute_worker_spawn_timeout: Duration::from_secs(3),
execute_workers_max_num: 2,
}
}
}
/// Start the validation host.
///
/// Returns a [handle][`ValidationHost`] to the started validation host and the future. The future
/// must be polled in order for validation host to function.
///
/// The future should not return normally but if it does then that indicates an unrecoverable error.
/// In that case all pending requests will be canceled, dropping the result senders and new ones
/// will be rejected.
pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future