From 56af2d4c68664302f1f9643e560b153bf372120d Mon Sep 17 00:00:00 2001
From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com>
Date: Thu, 8 Jun 2023 14:21:41 +0300
Subject: [PATCH] av-store: Move prune on a separate thread (#7263)

* av-store: Move prune on a separate thread

There are situations where pruning of the data could take more than a few
seconds and that might make the whole subsystem unreponsive. To avoid this just
move the prune process on a separate thread.

See: https://github.com/paritytech/polkadot/issues/7237, for more details.

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>

* av-store: Add log that prunning started

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>

* av-store: modify log severity

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
---
 polkadot/node/core/av-store/src/lib.rs | 71 ++++++++++++++++++++++----
 1 file changed, 62 insertions(+), 9 deletions(-)

diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs
index 1c0c8c5e7fe..17c9f9a1983 100644
--- a/polkadot/node/core/av-store/src/lib.rs
+++ b/polkadot/node/core/av-store/src/lib.rs
@@ -26,7 +26,13 @@ use std::{
 	time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
 };
 
-use futures::{channel::oneshot, future, select, FutureExt};
+use futures::{
+	channel::{
+		mpsc::{channel, Receiver as MpscReceiver, Sender as MpscSender},
+		oneshot,
+	},
+	future, select, FutureExt, SinkExt, StreamExt,
+};
 use futures_timer::Delay;
 use parity_scale_codec::{Decode, Encode, Error as CodecError, Input};
 use polkadot_node_subsystem_util::database::{DBTransaction, Database};
@@ -540,9 +546,17 @@ impl<Context> AvailabilityStoreSubsystem {
 #[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
 async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) {
 	let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();
-
+	// Pruning interval is in the order of minutes so we shouldn't have more than one task running
+	// at one moment in time, so 10 should be more than enough.
+	let (mut pruning_result_tx, mut pruning_result_rx) = channel(10);
 	loop {
-		let res = run_iteration(&mut ctx, &mut subsystem, &mut next_pruning).await;
+		let res = run_iteration(
+			&mut ctx,
+			&mut subsystem,
+			&mut next_pruning,
+			(&mut pruning_result_tx, &mut pruning_result_rx),
+		)
+		.await;
 		match res {
 			Err(e) => {
 				e.trace();
@@ -564,6 +578,10 @@ async fn run_iteration<Context>(
 	ctx: &mut Context,
 	subsystem: &mut AvailabilityStoreSubsystem,
 	mut next_pruning: &mut future::Fuse<Delay>,
+	(pruning_result_tx, pruning_result_rx): (
+		&mut MpscSender<Result<(), Error>>,
+		&mut MpscReceiver<Result<(), Error>>,
+	),
 ) -> Result<bool, Error> {
 	select! {
 		incoming = ctx.recv().fuse() => {
@@ -612,15 +630,51 @@ async fn run_iteration<Context>(
 			// It's important to set the delay before calling `prune_all` because an error in `prune_all`
 			// could lead to the delay not being set again. Then we would never prune anything anymore.
 			*next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();
-
-			let _timer = subsystem.metrics.time_pruning();
-			prune_all(&subsystem.db, &subsystem.config, &*subsystem.clock)?;
-		}
+			start_prune_all(ctx, subsystem, pruning_result_tx.clone()).await?;
+		},
+		// Received the prune result and propagate the errors, so that in case of a fatal error
+		// the main loop of the subsystem can exit graciously.
+		result = pruning_result_rx.next() => {
+			if let Some(result) = result {
+				result?;
+			}
+		},
 	}
 
 	Ok(false)
 }
 
+// Start prune-all on a separate thread, so that in the case when the operation takes
+// longer than expected we don't keep the whole subsystem blocked.
+// See: https://github.com/paritytech/polkadot/issues/7237 for more details.
+#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
+async fn start_prune_all<Context>(
+	ctx: &mut Context,
+	subsystem: &mut AvailabilityStoreSubsystem,
+	mut pruning_result_tx: MpscSender<Result<(), Error>>,
+) -> Result<(), Error> {
+	let metrics = subsystem.metrics.clone();
+	let db = subsystem.db.clone();
+	let config = subsystem.config;
+	let time_now = subsystem.clock.now()?;
+
+	ctx.spawn_blocking(
+		"av-store-prunning",
+		Box::pin(async move {
+			let _timer = metrics.time_pruning();
+
+			gum::debug!(target: LOG_TARGET, "Prunning started");
+			let result = prune_all(&db, &config, time_now);
+
+			if let Err(err) = pruning_result_tx.send(result).await {
+				// This usually means that the node is closing down, log it just in case
+				gum::debug!(target: LOG_TARGET, ?err, "Failed to send prune_all result",);
+			}
+		}),
+	)?;
+	Ok(())
+}
+
 #[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
 async fn process_block_activated<Context>(
 	ctx: &mut Context,
@@ -1250,8 +1304,7 @@ fn store_available_data(
 	Ok(())
 }
 
-fn prune_all(db: &Arc<dyn Database>, config: &Config, clock: &dyn Clock) -> Result<(), Error> {
-	let now = clock.now()?;
+fn prune_all(db: &Arc<dyn Database>, config: &Config, now: Duration) -> Result<(), Error> {
 	let (range_start, range_end) = pruning_range(now);
 
 	let mut tx = DBTransaction::new();
-- 
GitLab