Skip to content
Snippets Groups Projects
Commit 56af2d4c authored by Alexandru Gheorghe's avatar Alexandru Gheorghe Committed by GitHub
Browse files

av-store: Move prune on a separate thread (#7263)


* av-store: Move prune on a separate thread

There are situations where pruning of the data could take more than a few
seconds and that might make the whole subsystem unreponsive. To avoid this just
move the prune process on a separate thread.

See: https://github.com/paritytech/polkadot/issues/7237, for more details.

Signed-off-by: default avatarAlexandru Gheorghe <alexandru.gheorghe@parity.io>

* av-store: Add log that prunning started

Signed-off-by: default avatarAlexandru Gheorghe <alexandru.gheorghe@parity.io>

* av-store: modify log severity

Signed-off-by: default avatarAlexandru Gheorghe <alexandru.gheorghe@parity.io>

---------

Signed-off-by: default avatarAlexandru Gheorghe <alexandru.gheorghe@parity.io>
parent ef6ae485
Branches
No related merge requests found
......@@ -26,7 +26,13 @@ use std::{
time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
};
use futures::{channel::oneshot, future, select, FutureExt};
use futures::{
channel::{
mpsc::{channel, Receiver as MpscReceiver, Sender as MpscSender},
oneshot,
},
future, select, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode, Error as CodecError, Input};
use polkadot_node_subsystem_util::database::{DBTransaction, Database};
......@@ -540,9 +546,17 @@ impl<Context> AvailabilityStoreSubsystem {
#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) {
let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();
// Pruning interval is in the order of minutes so we shouldn't have more than one task running
// at one moment in time, so 10 should be more than enough.
let (mut pruning_result_tx, mut pruning_result_rx) = channel(10);
loop {
let res = run_iteration(&mut ctx, &mut subsystem, &mut next_pruning).await;
let res = run_iteration(
&mut ctx,
&mut subsystem,
&mut next_pruning,
(&mut pruning_result_tx, &mut pruning_result_rx),
)
.await;
match res {
Err(e) => {
e.trace();
......@@ -564,6 +578,10 @@ async fn run_iteration<Context>(
ctx: &mut Context,
subsystem: &mut AvailabilityStoreSubsystem,
mut next_pruning: &mut future::Fuse<Delay>,
(pruning_result_tx, pruning_result_rx): (
&mut MpscSender<Result<(), Error>>,
&mut MpscReceiver<Result<(), Error>>,
),
) -> Result<bool, Error> {
select! {
incoming = ctx.recv().fuse() => {
......@@ -612,15 +630,51 @@ async fn run_iteration<Context>(
// It's important to set the delay before calling `prune_all` because an error in `prune_all`
// could lead to the delay not being set again. Then we would never prune anything anymore.
*next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();
let _timer = subsystem.metrics.time_pruning();
prune_all(&subsystem.db, &subsystem.config, &*subsystem.clock)?;
}
start_prune_all(ctx, subsystem, pruning_result_tx.clone()).await?;
},
// Received the prune result and propagate the errors, so that in case of a fatal error
// the main loop of the subsystem can exit graciously.
result = pruning_result_rx.next() => {
if let Some(result) = result {
result?;
}
},
}
Ok(false)
}
// Start prune-all on a separate thread, so that in the case when the operation takes
// longer than expected we don't keep the whole subsystem blocked.
// See: https://github.com/paritytech/polkadot/issues/7237 for more details.
#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn start_prune_all<Context>(
ctx: &mut Context,
subsystem: &mut AvailabilityStoreSubsystem,
mut pruning_result_tx: MpscSender<Result<(), Error>>,
) -> Result<(), Error> {
let metrics = subsystem.metrics.clone();
let db = subsystem.db.clone();
let config = subsystem.config;
let time_now = subsystem.clock.now()?;
ctx.spawn_blocking(
"av-store-prunning",
Box::pin(async move {
let _timer = metrics.time_pruning();
gum::debug!(target: LOG_TARGET, "Prunning started");
let result = prune_all(&db, &config, time_now);
if let Err(err) = pruning_result_tx.send(result).await {
// This usually means that the node is closing down, log it just in case
gum::debug!(target: LOG_TARGET, ?err, "Failed to send prune_all result",);
}
}),
)?;
Ok(())
}
#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn process_block_activated<Context>(
ctx: &mut Context,
......@@ -1250,8 +1304,7 @@ fn store_available_data(
Ok(())
}
fn prune_all(db: &Arc<dyn Database>, config: &Config, clock: &dyn Clock) -> Result<(), Error> {
let now = clock.now()?;
fn prune_all(db: &Arc<dyn Database>, config: &Config, now: Duration) -> Result<(), Error> {
let (range_start, range_end) = pruning_range(now);
let mut tx = DBTransaction::new();
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment