From 494448b7fed02e098fbf38bad517d9245b056d1d Mon Sep 17 00:00:00 2001
From: Andrei Eres <eresav@me.com>
Date: Thu, 6 Jun 2024 21:22:22 +0200
Subject: [PATCH] Cleanup PVF artifact by cache limit and stale time (#4662)

Part of https://github.com/paritytech/polkadot-sdk/issues/4324
We don't change but extend the existing cleanup strategy.
- We still don't touch artifacts being stale less than 24h
- First time we attempt pruning only when we hit cache limit (10 GB)
- If somehow happened that after we hit 10 GB and least used artifact is
stale less than 24h we don't remove it.

---------

Co-authored-by: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com>
Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
---
 polkadot/node/core/pvf/common/src/prepare.rs  |   2 +
 polkadot/node/core/pvf/src/artifacts.rs       | 179 ++++++++++++++++--
 polkadot/node/core/pvf/src/host.rs            |  42 ++--
 .../core/pvf/src/prepare/worker_interface.rs  |  14 ++
 polkadot/node/core/pvf/src/testing.rs         |   8 +-
 prdoc/pr_4662.prdoc                           |  17 ++
 6 files changed, 226 insertions(+), 36 deletions(-)
 create mode 100644 prdoc/pr_4662.prdoc

diff --git a/polkadot/node/core/pvf/common/src/prepare.rs b/polkadot/node/core/pvf/common/src/prepare.rs
index 64e7f3d6bcf..81e165a7b8a 100644
--- a/polkadot/node/core/pvf/common/src/prepare.rs
+++ b/polkadot/node/core/pvf/common/src/prepare.rs
@@ -31,6 +31,8 @@ pub struct PrepareWorkerSuccess {
 pub struct PrepareSuccess {
 	/// Canonical path to the compiled artifact.
 	pub path: PathBuf,
+	/// Size in bytes
+	pub size: u64,
 	/// Stats of the current preparation run.
 	pub stats: PrepareStats,
 }
diff --git a/polkadot/node/core/pvf/src/artifacts.rs b/polkadot/node/core/pvf/src/artifacts.rs
index a3a48b61acb..119af34082a 100644
--- a/polkadot/node/core/pvf/src/artifacts.rs
+++ b/polkadot/node/core/pvf/src/artifacts.rs
@@ -142,6 +142,8 @@ pub enum ArtifactState {
 		/// This is updated when we get the heads up for this artifact or when we just discover
 		/// this file.
 		last_time_needed: SystemTime,
+		/// Size in bytes
+		size: u64,
 		/// Stats produced by successful preparation.
 		prepare_stats: PrepareStats,
 	},
@@ -169,6 +171,33 @@ pub struct Artifacts {
 	inner: HashMap<ArtifactId, ArtifactState>,
 }
 
+/// Parameters we use to cleanup artifacts
+/// After we hit the cache limit we remove the least used artifacts
+/// but only if they are stale more than minimum stale time
+#[derive(Debug)]
+pub struct ArtifactsCleanupConfig {
+	// Max size in bytes. Reaching it the least used artefacts are deleted
+	cache_limit: u64,
+	// Inactive time after which artefact is allowed to be deleted
+	min_stale_time: Duration,
+}
+
+impl Default for ArtifactsCleanupConfig {
+	fn default() -> Self {
+		Self {
+			cache_limit: 10 * 1024 * 1024 * 1024,              // 10 GiB
+			min_stale_time: Duration::from_secs(24 * 60 * 60), // 24 hours
+		}
+	}
+}
+
+#[cfg(test)]
+impl ArtifactsCleanupConfig {
+	pub fn new(cache_limit: u64, min_stale_time: Duration) -> Self {
+		Self { cache_limit, min_stale_time }
+	}
+}
+
 impl Artifacts {
 	#[cfg(test)]
 	pub(crate) fn empty() -> Self {
@@ -180,6 +209,11 @@ impl Artifacts {
 		self.inner.len()
 	}
 
+	#[cfg(test)]
+	fn artifact_ids(&self) -> Vec<ArtifactId> {
+		self.inner.keys().cloned().collect()
+	}
+
 	/// Create an empty table and the cache directory on-disk if it doesn't exist.
 	pub async fn new(cache_path: &Path) -> Self {
 		// Make sure that the cache path directory and all its parents are created.
@@ -234,12 +268,16 @@ impl Artifacts {
 		artifact_id: ArtifactId,
 		path: PathBuf,
 		last_time_needed: SystemTime,
+		size: u64,
 		prepare_stats: PrepareStats,
 	) {
 		// See the precondition.
 		always!(self
 			.inner
-			.insert(artifact_id, ArtifactState::Prepared { path, last_time_needed, prepare_stats })
+			.insert(
+				artifact_id,
+				ArtifactState::Prepared { path, last_time_needed, size, prepare_stats }
+			)
 			.is_none());
 	}
 
@@ -251,25 +289,40 @@ impl Artifacts {
 		})
 	}
 
-	/// Remove artifacts older than the given TTL and return id and path of the removed ones.
-	pub fn prune(&mut self, artifact_ttl: Duration) -> Vec<(ArtifactId, PathBuf)> {
+	/// Remove artifacts older than the given TTL when the total artifact size reaches the limit
+	/// and return id and path of the removed ones
+	pub fn prune(&mut self, cleanup_config: &ArtifactsCleanupConfig) -> Vec<(ArtifactId, PathBuf)> {
+		let mut to_remove = vec![];
 		let now = SystemTime::now();
 
-		let mut to_remove = vec![];
+		let mut total_size = 0;
+		let mut artifact_sizes = vec![];
+
 		for (k, v) in self.inner.iter() {
-			if let ArtifactState::Prepared { last_time_needed, ref path, .. } = *v {
-				if now
-					.duration_since(last_time_needed)
-					.map(|age| age > artifact_ttl)
-					.unwrap_or(false)
-				{
-					to_remove.push((k.clone(), path.clone()));
-				}
+			if let ArtifactState::Prepared { ref path, last_time_needed, size, .. } = *v {
+				total_size += size;
+				artifact_sizes.push((k.clone(), path.clone(), size, last_time_needed));
 			}
 		}
+		artifact_sizes
+			.sort_by_key(|&(_, _, _, last_time_needed)| std::cmp::Reverse(last_time_needed));
+
+		while total_size > cleanup_config.cache_limit {
+			let Some((artifact_id, path, size, last_time_needed)) = artifact_sizes.pop() else {
+				break
+			};
+
+			let used_recently = now
+				.duration_since(last_time_needed)
+				.map(|stale_time| stale_time < cleanup_config.min_stale_time)
+				.unwrap_or(true);
+			if used_recently {
+				break;
+			}
 
-		for artifact in &to_remove {
-			self.inner.remove(&artifact.0);
+			self.inner.remove(&artifact_id);
+			to_remove.push((artifact_id, path));
+			total_size -= size;
 		}
 
 		to_remove
@@ -278,6 +331,8 @@ impl Artifacts {
 
 #[cfg(test)]
 mod tests {
+	use crate::testing::artifact_id;
+
 	use super::*;
 
 	#[tokio::test]
@@ -307,4 +362,100 @@ mod tests {
 		assert!(entries.contains(&String::from("worker-prepare-test")));
 		assert_eq!(artifacts.len(), 0);
 	}
+
+	#[tokio::test]
+	async fn test_pruned_by_cache_size() {
+		let mock_now = SystemTime::now();
+		let tempdir = tempfile::tempdir().unwrap();
+		let cache_path = tempdir.path();
+
+		let path1 = generate_artifact_path(cache_path);
+		let path2 = generate_artifact_path(cache_path);
+		let path3 = generate_artifact_path(cache_path);
+		let artifact_id1 = artifact_id(1);
+		let artifact_id2 = artifact_id(2);
+		let artifact_id3 = artifact_id(3);
+
+		let mut artifacts = Artifacts::new(cache_path).await;
+		let cleanup_config = ArtifactsCleanupConfig::new(1500, Duration::from_secs(0));
+
+		artifacts.insert_prepared(
+			artifact_id1.clone(),
+			path1.clone(),
+			mock_now - Duration::from_secs(5),
+			1024,
+			PrepareStats::default(),
+		);
+		artifacts.insert_prepared(
+			artifact_id2.clone(),
+			path2.clone(),
+			mock_now - Duration::from_secs(10),
+			1024,
+			PrepareStats::default(),
+		);
+		artifacts.insert_prepared(
+			artifact_id3.clone(),
+			path3.clone(),
+			mock_now - Duration::from_secs(15),
+			1024,
+			PrepareStats::default(),
+		);
+
+		let pruned = artifacts.prune(&cleanup_config);
+
+		assert!(artifacts.artifact_ids().contains(&artifact_id1));
+		assert!(!pruned.contains(&(artifact_id1, path1)));
+		assert!(!artifacts.artifact_ids().contains(&artifact_id2));
+		assert!(pruned.contains(&(artifact_id2, path2)));
+		assert!(!artifacts.artifact_ids().contains(&artifact_id3));
+		assert!(pruned.contains(&(artifact_id3, path3)));
+	}
+
+	#[tokio::test]
+	async fn test_did_not_prune_by_cache_size_because_of_stale_time() {
+		let mock_now = SystemTime::now();
+		let tempdir = tempfile::tempdir().unwrap();
+		let cache_path = tempdir.path();
+
+		let path1 = generate_artifact_path(cache_path);
+		let path2 = generate_artifact_path(cache_path);
+		let path3 = generate_artifact_path(cache_path);
+		let artifact_id1 = artifact_id(1);
+		let artifact_id2 = artifact_id(2);
+		let artifact_id3 = artifact_id(3);
+
+		let mut artifacts = Artifacts::new(cache_path).await;
+		let cleanup_config = ArtifactsCleanupConfig::new(1500, Duration::from_secs(12));
+
+		artifacts.insert_prepared(
+			artifact_id1.clone(),
+			path1.clone(),
+			mock_now - Duration::from_secs(5),
+			1024,
+			PrepareStats::default(),
+		);
+		artifacts.insert_prepared(
+			artifact_id2.clone(),
+			path2.clone(),
+			mock_now - Duration::from_secs(10),
+			1024,
+			PrepareStats::default(),
+		);
+		artifacts.insert_prepared(
+			artifact_id3.clone(),
+			path3.clone(),
+			mock_now - Duration::from_secs(15),
+			1024,
+			PrepareStats::default(),
+		);
+
+		let pruned = artifacts.prune(&cleanup_config);
+
+		assert!(artifacts.artifact_ids().contains(&artifact_id1));
+		assert!(!pruned.contains(&(artifact_id1, path1)));
+		assert!(artifacts.artifact_ids().contains(&artifact_id2));
+		assert!(!pruned.contains(&(artifact_id2, path2)));
+		assert!(!artifacts.artifact_ids().contains(&artifact_id3));
+		assert!(pruned.contains(&(artifact_id3, path3)));
+	}
 }
diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs
index 4065598a3ac..462631d33b5 100644
--- a/polkadot/node/core/pvf/src/host.rs
+++ b/polkadot/node/core/pvf/src/host.rs
@@ -21,7 +21,7 @@
 //! [`ValidationHost`], that allows communication with that event-loop.
 
 use crate::{
-	artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts},
+	artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts, ArtifactsCleanupConfig},
 	execute::{self, PendingExecutionRequest},
 	metrics::Metrics,
 	prepare, Priority, SecurityStatus, ValidationError, LOG_TARGET,
@@ -293,7 +293,7 @@ pub async fn start(
 	let run_host = async move {
 		run(Inner {
 			cleanup_pulse_interval: Duration::from_secs(3600),
-			artifact_ttl: Duration::from_secs(3600 * 24),
+			cleanup_config: ArtifactsCleanupConfig::default(),
 			artifacts,
 			to_host_rx,
 			to_prepare_queue_tx,
@@ -337,7 +337,7 @@ impl AwaitingPrepare {
 
 struct Inner {
 	cleanup_pulse_interval: Duration,
-	artifact_ttl: Duration,
+	cleanup_config: ArtifactsCleanupConfig,
 	artifacts: Artifacts,
 
 	to_host_rx: mpsc::Receiver<ToHost>,
@@ -359,7 +359,7 @@ struct Fatal;
 async fn run(
 	Inner {
 		cleanup_pulse_interval,
-		artifact_ttl,
+		cleanup_config,
 		mut artifacts,
 		to_host_rx,
 		from_prepare_queue_rx,
@@ -415,7 +415,7 @@ async fn run(
 				break_if_fatal!(handle_cleanup_pulse(
 					&mut to_sweeper_tx,
 					&mut artifacts,
-					artifact_ttl,
+					&cleanup_config,
 				).await);
 			},
 			to_host = to_host_rx.next() => {
@@ -803,8 +803,12 @@ async fn handle_prepare_done(
 	}
 
 	*state = match result {
-		Ok(PrepareSuccess { path, stats: prepare_stats }) =>
-			ArtifactState::Prepared { path, last_time_needed: SystemTime::now(), prepare_stats },
+		Ok(PrepareSuccess { path, stats: prepare_stats, size }) => ArtifactState::Prepared {
+			path,
+			last_time_needed: SystemTime::now(),
+			size,
+			prepare_stats,
+		},
 		Err(error) => {
 			let last_time_failed = SystemTime::now();
 			let num_failures = *num_failures + 1;
@@ -859,9 +863,9 @@ async fn enqueue_prepare_for_execute(
 async fn handle_cleanup_pulse(
 	sweeper_tx: &mut mpsc::Sender<PathBuf>,
 	artifacts: &mut Artifacts,
-	artifact_ttl: Duration,
+	cleanup_config: &ArtifactsCleanupConfig,
 ) -> Result<(), Fatal> {
-	let to_remove = artifacts.prune(artifact_ttl);
+	let to_remove = artifacts.prune(cleanup_config);
 	gum::debug!(
 		target: LOG_TARGET,
 		"PVF pruning: {} artifacts reached their end of life",
@@ -959,7 +963,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
 #[cfg(test)]
 pub(crate) mod tests {
 	use super::*;
-	use crate::{artifacts::generate_artifact_path, PossiblyInvalidError};
+	use crate::{artifacts::generate_artifact_path, testing::artifact_id, PossiblyInvalidError};
 	use assert_matches::assert_matches;
 	use futures::future::BoxFuture;
 	use polkadot_node_core_pvf_common::prepare::PrepareStats;
@@ -981,14 +985,9 @@ pub(crate) mod tests {
 		}
 	}
 
-	/// Creates a new PVF which artifact id can be uniquely identified by the given number.
-	fn artifact_id(discriminator: u32) -> ArtifactId {
-		ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(discriminator))
-	}
-
 	struct Builder {
 		cleanup_pulse_interval: Duration,
-		artifact_ttl: Duration,
+		cleanup_config: ArtifactsCleanupConfig,
 		artifacts: Artifacts,
 	}
 
@@ -997,8 +996,7 @@ pub(crate) mod tests {
 			Self {
 				// these are selected high to not interfere in tests in which pruning is irrelevant.
 				cleanup_pulse_interval: Duration::from_secs(3600),
-				artifact_ttl: Duration::from_secs(3600),
-
+				cleanup_config: ArtifactsCleanupConfig::default(),
 				artifacts: Artifacts::empty(),
 			}
 		}
@@ -1022,7 +1020,7 @@ pub(crate) mod tests {
 	}
 
 	impl Test {
-		fn new(Builder { cleanup_pulse_interval, artifact_ttl, artifacts }: Builder) -> Self {
+		fn new(Builder { cleanup_pulse_interval, artifacts, cleanup_config }: Builder) -> Self {
 			let (to_host_tx, to_host_rx) = mpsc::channel(10);
 			let (to_prepare_queue_tx, to_prepare_queue_rx) = mpsc::channel(10);
 			let (from_prepare_queue_tx, from_prepare_queue_rx) = mpsc::unbounded();
@@ -1032,7 +1030,7 @@ pub(crate) mod tests {
 
 			let run = run(Inner {
 				cleanup_pulse_interval,
-				artifact_ttl,
+				cleanup_config,
 				artifacts,
 				to_host_rx,
 				to_prepare_queue_tx,
@@ -1183,19 +1181,21 @@ pub(crate) mod tests {
 
 		let mut builder = Builder::default();
 		builder.cleanup_pulse_interval = Duration::from_millis(100);
-		builder.artifact_ttl = Duration::from_millis(500);
+		builder.cleanup_config = ArtifactsCleanupConfig::new(1024, Duration::from_secs(0));
 		let path1 = generate_artifact_path(cache_path);
 		let path2 = generate_artifact_path(cache_path);
 		builder.artifacts.insert_prepared(
 			artifact_id(1),
 			path1.clone(),
 			mock_now,
+			1024,
 			PrepareStats::default(),
 		);
 		builder.artifacts.insert_prepared(
 			artifact_id(2),
 			path2.clone(),
 			mock_now,
+			1024,
 			PrepareStats::default(),
 		);
 		let mut test = builder.build();
diff --git a/polkadot/node/core/pvf/src/prepare/worker_interface.rs b/polkadot/node/core/pvf/src/prepare/worker_interface.rs
index 5c4245d7631..22ee93319d8 100644
--- a/polkadot/node/core/pvf/src/prepare/worker_interface.rs
+++ b/polkadot/node/core/pvf/src/prepare/worker_interface.rs
@@ -234,6 +234,19 @@ async fn handle_response(
 		return Outcome::TimedOut
 	}
 
+	let size = match tokio::fs::metadata(cache_path).await {
+		Ok(metadata) => metadata.len(),
+		Err(err) => {
+			gum::warn!(
+				target: LOG_TARGET,
+				?cache_path,
+				"failed to read size of the artifact: {}",
+				err,
+			);
+			return Outcome::IoErr(err.to_string())
+		},
+	};
+
 	// The file name should uniquely identify the artifact even across restarts. In case the cache
 	// for some reason is not cleared correctly, we cannot
 	// accidentally execute an artifact compiled under a different wasmtime version, host
@@ -253,6 +266,7 @@ async fn handle_response(
 			worker,
 			result: Ok(PrepareSuccess {
 				path: artifact_path,
+				size,
 				stats: PrepareStats { cpu_time_elapsed, memory_stats: memory_stats.clone() },
 			}),
 		},
diff --git a/polkadot/node/core/pvf/src/testing.rs b/polkadot/node/core/pvf/src/testing.rs
index 60b0b4b8d3d..8c75dafa69c 100644
--- a/polkadot/node/core/pvf/src/testing.rs
+++ b/polkadot/node/core/pvf/src/testing.rs
@@ -21,8 +21,9 @@ pub use crate::{
 	worker_interface::{spawn_with_program_path, SpawnErr},
 };
 
-use crate::get_worker_version;
+use crate::{artifacts::ArtifactId, get_worker_version};
 use is_executable::IsExecutable;
+use polkadot_node_core_pvf_common::pvf::PvfPrepData;
 use polkadot_node_primitives::NODE_VERSION;
 use polkadot_primitives::ExecutorParams;
 use std::{
@@ -126,3 +127,8 @@ pub fn build_workers_and_get_paths() -> (PathBuf, PathBuf) {
 	let guard = mutex.lock().unwrap();
 	(guard.0.clone(), guard.1.clone())
 }
+
+/// Creates a new PVF which artifact id can be uniquely identified by the given number.
+pub fn artifact_id(discriminator: u32) -> ArtifactId {
+	ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(discriminator))
+}
diff --git a/prdoc/pr_4662.prdoc b/prdoc/pr_4662.prdoc
new file mode 100644
index 00000000000..50f8a5bfd01
--- /dev/null
+++ b/prdoc/pr_4662.prdoc
@@ -0,0 +1,17 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: Cleanup PVF artifact by cache limit and stale time
+
+doc:
+  - audience: Node Operator
+    description: |
+      Extend the PVF artifacts cleanup strategy. Previously, we pruned artifacts that were stale more than 24 hours.
+      After this change we attempt pruning artifacts only when they reach the 10 GB cache limit. If the least used
+      artifact is stale less than 24 hours we don't remove it.
+
+crates:
+  - name: polkadot-node-core-pvf-common
+    bump: patch
+  - name: polkadot-node-core-pvf
+    bump: patch
-- 
GitLab