Unverified Commit efec9463 authored by Sergey Pepyakin's avatar Sergey Pepyakin Committed by GitHub
Browse files

Add logging to PVF and other related parts (#3596)


Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
parent 1cab7d3a
Pipeline #151862 passed with stages
in 43 minutes and 45 seconds
......@@ -2107,13 +2107,9 @@ async fn launch_approval(
}
let candidate_hash = candidate.hash();
let para_id = candidate.descriptor.para_id;
tracing::trace!(
target: LOG_TARGET,
?candidate_hash,
para_id = ?candidate.descriptor.para_id,
"Recovering data.",
);
tracing::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data.");
let timer = metrics.time_recover_and_approve();
ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
......@@ -2149,6 +2145,8 @@ async fn launch_approval(
&RecoveryError::Unavailable => {
tracing::warn!(
target: LOG_TARGET,
?para_id,
?candidate_hash,
"Data unavailable for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
......@@ -2158,6 +2156,8 @@ async fn launch_approval(
&RecoveryError::Invalid => {
tracing::warn!(
target: LOG_TARGET,
?para_id,
?candidate_hash,
"Data recovery invalid for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
......@@ -2201,8 +2201,6 @@ async fn launch_approval(
let (val_tx, val_rx) = oneshot::channel();
let para_id = candidate.descriptor.para_id;
sender
.send_message(
CandidateValidationMessage::ValidateFromExhaustive(
......@@ -2274,6 +2272,8 @@ async fn launch_approval(
tracing::error!(
target: LOG_TARGET,
err = ?e,
?candidate_hash,
?para_id,
"Failed to validate candidate due to internal error",
);
metrics_guard.take().on_approval_error();
......
......@@ -41,7 +41,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult};
use polkadot_primitives::v1::{
CandidateCommitments, CandidateDescriptor, Hash, OccupiedCoreAssumption,
PersistedValidationData, ValidationCode,
PersistedValidationData, ValidationCode, ValidationCodeHash,
};
use parity_scale_codec::Encode;
......@@ -164,6 +164,7 @@ where
match res {
Ok(x) => {
metrics.on_validation_event(&x);
if let Err(_e) = response_sender.send(x) {
tracing::warn!(
target: LOG_TARGET,
......@@ -349,11 +350,19 @@ async fn validate_candidate_exhaustive(
) -> SubsystemResult<Result<ValidationResult, ValidationFailed>> {
let _timer = metrics.time_validate_candidate_exhaustive();
let validation_code_hash = validation_code.hash();
tracing::debug!(
target: LOG_TARGET,
?validation_code_hash,
para_id = ?descriptor.para_id,
"About to validate a candidate.",
);
if let Err(e) = perform_basic_checks(
&descriptor,
persisted_validation_data.max_pov_size,
&*pov,
&validation_code,
&validation_code_hash,
) {
return Ok(Ok(ValidationResult::Invalid(e)))
}
......@@ -478,10 +487,9 @@ fn perform_basic_checks(
candidate: &CandidateDescriptor,
max_pov_size: u32,
pov: &PoV,
validation_code: &ValidationCode,
validation_code_hash: &ValidationCodeHash,
) -> Result<(), InvalidCandidate> {
let pov_hash = pov.hash();
let validation_code_hash = validation_code.hash();
let encoded_pov_size = pov.encoded_size();
if encoded_pov_size > max_pov_size as usize {
......@@ -492,7 +500,7 @@ fn perform_basic_checks(
return Err(InvalidCandidate::PoVHashMismatch)
}
if validation_code_hash != candidate.validation_code_hash {
if *validation_code_hash != candidate.validation_code_hash {
return Err(InvalidCandidate::CodeHashMismatch)
}
......
......@@ -340,8 +340,12 @@ fn candidate_validation_ok_is_ok() {
descriptor.validation_code_hash = validation_code.hash();
collator_sign(&mut descriptor, Sr25519Keyring::Alice);
let check =
perform_basic_checks(&descriptor, validation_data.max_pov_size, &pov, &validation_code);
let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert!(check.is_ok());
let validation_result = WasmValidationResult {
......@@ -386,8 +390,12 @@ fn candidate_validation_bad_return_is_invalid() {
descriptor.validation_code_hash = validation_code.hash();
collator_sign(&mut descriptor, Sr25519Keyring::Alice);
let check =
perform_basic_checks(&descriptor, validation_data.max_pov_size, &pov, &validation_code);
let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert!(check.is_ok());
let v = executor::block_on(validate_candidate_exhaustive(
......@@ -418,8 +426,12 @@ fn candidate_validation_timeout_is_internal_error() {
descriptor.validation_code_hash = validation_code.hash();
collator_sign(&mut descriptor, Sr25519Keyring::Alice);
let check =
perform_basic_checks(&descriptor, validation_data.max_pov_size, &pov, &validation_code);
let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert!(check.is_ok());
let v = executor::block_on(validate_candidate_exhaustive(
......@@ -449,8 +461,12 @@ fn candidate_validation_code_mismatch_is_invalid() {
descriptor.validation_code_hash = ValidationCode(vec![1; 16]).hash();
collator_sign(&mut descriptor, Sr25519Keyring::Alice);
let check =
perform_basic_checks(&descriptor, validation_data.max_pov_size, &pov, &validation_code);
let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert_matches!(check, Err(InvalidCandidate::CodeHashMismatch));
let v = executor::block_on(validate_candidate_exhaustive(
......
......@@ -54,7 +54,7 @@ impl Artifact {
/// multiple engine implementations the artifact ID should include the engine type as well.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ArtifactId {
code_hash: ValidationCodeHash,
pub(crate) code_hash: ValidationCodeHash,
}
impl ArtifactId {
......@@ -84,6 +84,25 @@ impl ArtifactId {
}
}
/// A bundle of the artifact ID and the path.
///
/// Rationale for having this is two-fold:
///
/// - While we can derive the artifact path from the artifact id, it makes sense to carry it around
/// sometimes to avoid extra work.
/// - At the same time, carrying only path limiting the ability for logging.
#[derive(Debug, Clone)]
pub struct ArtifactPathId {
pub(crate) id: ArtifactId,
pub(crate) path: PathBuf,
}
impl ArtifactPathId {
pub(crate) fn new(artifact_id: ArtifactId, cache_path: &Path) -> Self {
Self { path: artifact_id.path(cache_path), id: artifact_id }
}
}
pub enum ArtifactState {
/// The artifact is ready to be used by the executor.
///
......
......@@ -18,6 +18,7 @@
use super::worker::Outcome;
use crate::{
artifacts::{ArtifactId, ArtifactPathId},
host::ResultSender,
worker_common::{IdleWorker, WorkerHandle},
InvalidCandidate, ValidationError, LOG_TARGET,
......@@ -36,11 +37,11 @@ slotmap::new_key_type! { struct Worker; }
#[derive(Debug)]
pub enum ToQueue {
Enqueue { artifact_path: PathBuf, params: Vec<u8>, result_tx: ResultSender },
Enqueue { artifact: ArtifactPathId, params: Vec<u8>, result_tx: ResultSender },
}
struct ExecuteJob {
artifact_path: PathBuf,
artifact: ArtifactPathId,
params: Vec<u8>,
result_tx: ResultSender,
}
......@@ -87,8 +88,8 @@ impl Workers {
}
enum QueueEvent {
Spawn((IdleWorker, WorkerHandle)),
StartWork(Worker, Outcome, ResultSender),
Spawn(IdleWorker, WorkerHandle),
StartWork(Worker, Outcome, ArtifactId, ResultSender),
}
type Mux = FuturesUnordered<BoxFuture<'static, QueueEvent>>;
......@@ -159,9 +160,13 @@ async fn purge_dead(workers: &mut Workers) {
}
fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact_path, params, result_tx } = to_queue;
let job = ExecuteJob { artifact_path, params, result_tx };
let ToQueue::Enqueue { artifact, params, result_tx } = to_queue;
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
let job = ExecuteJob { artifact, params, result_tx };
if let Some(available) = queue.workers.find_available() {
assign(queue, available, job);
......@@ -175,24 +180,35 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
match event {
QueueEvent::Spawn((idle, handle)) => {
queue.workers.spawn_inflight -= 1;
let worker = queue.workers.running.insert(WorkerData { idle: Some(idle), handle });
if let Some(job) = queue.queue.pop_front() {
assign(queue, worker, job);
}
QueueEvent::Spawn(idle, handle) => {
handle_worker_spawned(queue, idle, handle);
},
QueueEvent::StartWork(worker, outcome, result_tx) => {
handle_job_finish(queue, worker, outcome, result_tx);
QueueEvent::StartWork(worker, outcome, artifact_id, result_tx) => {
handle_job_finish(queue, worker, outcome, artifact_id, result_tx);
},
}
}
fn handle_worker_spawned(queue: &mut Queue, idle: IdleWorker, handle: WorkerHandle) {
queue.workers.spawn_inflight -= 1;
let worker = queue.workers.running.insert(WorkerData { idle: Some(idle), handle });
tracing::debug!(target: LOG_TARGET, ?worker, "execute worker spawned");
if let Some(job) = queue.queue.pop_front() {
assign(queue, worker, job);
}
}
/// If there are pending jobs in the queue, schedules the next of them onto the just freed up
/// worker. Otherwise, puts back into the available workers list.
fn handle_job_finish(queue: &mut Queue, worker: Worker, outcome: Outcome, result_tx: ResultSender) {
fn handle_job_finish(
queue: &mut Queue,
worker: Worker,
outcome: Outcome,
artifact_id: ArtifactId,
result_tx: ResultSender,
) {
let (idle_worker, result) = match outcome {
Outcome::Ok { result_descriptor, duration_ms, idle_worker } => {
// TODO: propagate the soft timeout
......@@ -212,6 +228,14 @@ fn handle_job_finish(queue: &mut Queue, worker: Worker, outcome: Outcome, result
(None, Err(ValidationError::InvalidCandidate(InvalidCandidate::AmbigiousWorkerDeath))),
};
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
worker_rip = idle_worker.is_none(),
?result,
"job finished.",
);
// First we send the result. It may fail due the other end of the channel being dropped, that's
// legitimate and we don't treat that as an error.
let _ = result_tx.send(result);
......@@ -245,6 +269,8 @@ fn handle_job_finish(queue: &mut Queue, worker: Worker, outcome: Outcome, result
}
fn spawn_extra_worker(queue: &mut Queue) {
tracing::debug!(target: LOG_TARGET, "spawning an extra worker");
queue
.mux
.push(spawn_worker_task(queue.program_path.clone(), queue.spawn_timeout).boxed());
......@@ -256,7 +282,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Qu
loop {
match super::worker::spawn(&program_path, spawn_timeout).await {
Ok((idle, handle)) => break QueueEvent::Spawn((idle, handle)),
Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle),
Err(err) => {
tracing::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);
......@@ -271,6 +297,13 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Qu
///
/// The worker must be running and idle.
fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?job.artifact.id,
?worker,
"assigning the execute worker",
);
let idle = queue.workers.claim_idle(worker).expect(
"this caller must supply a worker which is idle and running;
thus claim_idle cannot return None;
......@@ -278,8 +311,8 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
);
queue.mux.push(
async move {
let outcome = super::worker::start_work(idle, job.artifact_path, job.params).await;
QueueEvent::StartWork(worker, outcome, job.result_tx)
let outcome = super::worker::start_work(idle, job.artifact.clone(), job.params).await;
QueueEvent::StartWork(worker, outcome, job.artifact.id, job.result_tx)
}
.boxed(),
);
......
......@@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{
artifacts::Artifact,
artifacts::{Artifact, ArtifactPathId},
executor_intf::TaskExecutor,
worker_common::{
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
......@@ -68,7 +68,7 @@ pub enum Outcome {
/// returns the outcome.
pub async fn start_work(
worker: IdleWorker,
artifact_path: PathBuf,
artifact: ArtifactPathId,
validation_params: Vec<u8>,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;
......@@ -76,22 +76,47 @@ pub async fn start_work(
tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"starting execute for {}",
artifact_path.display(),
artifact.path.display(),
);
if send_request(&mut stream, &artifact_path, &validation_params).await.is_err() {
if let Err(error) = send_request(&mut stream, &artifact.path, &validation_params).await {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
?error,
"failed to send an execute request",
);
return Outcome::IoErr
}
let response = futures::select! {
response = recv_response(&mut stream).fuse() => {
match response {
Err(_err) => return Outcome::IoErr,
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
?error,
"failed to recv an execute response",
);
return Outcome::IoErr
},
Ok(response) => response,
}
},
_ = Delay::new(EXECUTION_TIMEOUT).fuse() => return Outcome::HardTimeout,
_ = Delay::new(EXECUTION_TIMEOUT).fuse() => {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded alloted time for execution",
);
return Outcome::HardTimeout;
},
};
match response {
......
......@@ -21,8 +21,8 @@
//! [`ValidationHost`], that allows communication with that event-loop.
use crate::{
artifacts::{ArtifactId, ArtifactState, Artifacts},
execute, prepare, Priority, Pvf, ValidationError,
artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
execute, prepare, Priority, Pvf, ValidationError, LOG_TARGET,
};
use always_assert::never;
use async_std::path::{Path, PathBuf};
......@@ -398,7 +398,7 @@ async fn handle_execute_pvf(
send_execute(
execute_queue,
execute::ToQueue::Enqueue {
artifact_path: artifact_id.path(cache_path),
artifact: ArtifactPathId::new(artifact_id, cache_path),
params,
result_tx,
},
......@@ -493,7 +493,6 @@ async fn handle_prepare_done(
// It's finally time to dispatch all the execution requests that were waiting for this artifact
// to be prepared.
let artifact_path = artifact_id.path(&cache_path);
let pending_requests = awaiting_prepare.take(&artifact_id);
for PendingExecutionRequest { params, result_tx } in pending_requests {
if result_tx.is_canceled() {
......@@ -504,7 +503,11 @@ async fn handle_prepare_done(
send_execute(
execute_queue,
execute::ToQueue::Enqueue { artifact_path: artifact_path.clone(), params, result_tx },
execute::ToQueue::Enqueue {
artifact: ArtifactPathId::new(artifact_id.clone(), cache_path),
params,
result_tx,
},
)
.await?;
}
......@@ -536,7 +539,17 @@ async fn handle_cleanup_pulse(
artifact_ttl: Duration,
) -> Result<(), Fatal> {
let to_remove = artifacts.prune(artifact_ttl);
tracing::info!(
target: LOG_TARGET,
"PVF pruning: {} artifacts reached their end of life",
to_remove.len(),
);
for artifact_id in to_remove {
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
"pruning artifact",
);
let artifact_path = artifact_id.path(cache_path);
sweeper_tx.send(artifact_path).await.map_err(|_| Fatal)?;
}
......@@ -550,7 +563,13 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) {
match sweeper_rx.next().await {
None => break,
Some(condemned) => {
let _ = async_std::fs::remove_file(condemned).await;
let result = async_std::fs::remove_file(&condemned).await;
tracing::trace!(
target: LOG_TARGET,
?result,
"Sweeping the artifact file {}",
condemned.display(),
);
},
}
}
......
......@@ -194,6 +194,7 @@ fn handle_to_pool(
) {
match to_pool {
ToPool::Spawn => {
tracing::debug!(target: LOG_TARGET, "spawning a new prepare worker");
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path, background_priority } => {
......@@ -224,6 +225,7 @@ fn handle_to_pool(
}
},
ToPool::Kill(worker) => {
tracing::debug!(target: LOG_TARGET, ?worker, "killing prepare worker");
// It may be absent if it were previously already removed by `purge_dead`.
let _ = spawned.remove(worker);
},
......
......@@ -212,6 +212,13 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat
}
async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Result<(), Fatal> {
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?pvf.code_hash,
?priority,
"PVF is enqueued for preparation.",
);
let artifact_id = pvf.as_artifact_id();
if never!(
queue.artifact_id_to_job.contains_key(&artifact_id),
......@@ -254,8 +261,14 @@ async fn handle_amend(
artifact_id: ArtifactId,
) -> Result<(), Fatal> {
if let Some(&job) = queue.artifact_id_to_job.get(&artifact_id) {
let mut job_data: &mut JobData = &mut queue.jobs[job];
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
?priority,
"amending preparation priority.",
);
let mut job_data: &mut JobData = &mut queue.jobs[job];
if job_data.priority < priority {
// The new priority is higher. We should do two things:
// - if the worker was already spawned with the background prio and the new one is not
......@@ -349,6 +362,14 @@ async fn handle_worker_concluded(
queue.artifact_id_to_job.remove(&artifact_id);
tracing::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact_id.code_hash,
?worker,
?rip,
"prepare worker concluded",
);
reply(&mut queue.from_queue_tx, FromQueue::Prepared(artifact_id))?;
// Figure out what to do with the worker.
......@@ -380,8 +401,9 @@ async fn handle_worker_concluded(
}
async fn handle_worker_rip(queue: &mut Queue, worker: Worker) -> Result<(), Fatal> {
let worker_data = queue.workers.remove(worker);
tracing::debug!(target: LOG_TARGET, ?worker, "prepare worker ripped");
let worker_data = queue.workers.remove(worker);
if let Some(WorkerData { job: Some(job), .. }) = worker_data {
// This is an edge case where the worker ripped after we sent assignment but before it
// was received by the pool.
......
......@@ -103,6 +103,7 @@ pub async fn start_work(
// We may potentially overwrite the artifact in rare cases where the worker didn't make
// it to report back the result.
#[derive(Debug)]
enum Selected {
Done,
IoErr,
......@@ -170,7 +171,15 @@ pub async fn start_work(
Selected::IoErr | Selected::Deadline => {
let bytes = Artifact::DidntMakeIt.serialize();
// best effort: there is nothing we can do here if the write fails.
let _ = async_std::fs::write(&artifact_path, &bytes).await;
if let Err(err) = async_std::fs::write(&artifact_path, &bytes).await {
tracing::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"preparation didn't make it, because of `{:?}`: {:?}",
selected,
err,
);
}
Outcome::DidntMakeIt
},
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment