Unverified Commit 0363cc63 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

more resilient subsystems: av-store (#1888)

* utils: remove unused error

* av-store: do not exit early on errors

* av-store: revert logging change on master

* av-store: add a test
parent 1364ee69
Pipeline #112981 passed with stages
in 21 minutes and 39 seconds
......@@ -27,7 +27,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
use codec::{Encode, Decode};
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt, TryFutureExt};
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt};
use futures_timer::Delay;
use kvdb_rocksdb::{Database, DatabaseConfig};
use kvdb::{KeyValueDB, DBTransaction};
......@@ -57,34 +57,31 @@ mod columns {
#[derive(Debug, Error)]
enum Error {
#[error(transparent)]
ChainAPI(#[from] ChainApiError),
RuntimeApi(#[from] RuntimeApiError),
#[error(transparent)]
ChainApi(#[from] ChainApiError),
#[error(transparent)]
Erasure(#[from] erasure::Error),
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
ChainApiChannelIsClosed(#[from] oneshot::Canceled),
Oneshot(#[from] oneshot::Canceled),
#[error(transparent)]
Subsystem(#[from] SubsystemError),
#[error(transparent)]
Time(#[from] SystemTimeError),
}
/// Class of errors which we should handle more gracefully.
/// An occurrence of this error should not bring down the subsystem.
#[derive(Debug, Error)]
enum NonFatalError {
/// A Runtime API error occurred.
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
/// The receiver's end of the channel is closed.
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
/// Overseer channel's buffer is full.
#[error(transparent)]
OverseerOutOfCapacity(#[from] SubsystemError),
impl Error {
fn severity(&self) -> log::Level {
match self {
// don't spam the log with spurious errors
Self::RuntimeApi(_) |
Self::Oneshot(_) => log::Level::Debug,
// it's worth reporting otherwise
_ => log::Level::Warn,
}
}
}
/// A wrapper type for delays.
......@@ -477,11 +474,29 @@ fn get_next_chunk_pruning_time(db: &Arc<dyn KeyValueDB>) -> Option<NextChunkPrun
}
async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context)
-> Result<(), Error>
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
loop {
let res = run_iteration(&mut subsystem, &mut ctx).await;
match res {
Err(e) => {
log::log!(target: LOG_TARGET, e.severity(), "{}", e);
}
Ok(true) => {
log::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
break;
},
Ok(false) => continue,
}
}
}
async fn run_iteration<Context>(subsystem: &mut AvailabilityStoreSubsystem, ctx: &mut Context)
-> Result<bool, Error>
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
// Every time the following two methods are called a read from DB is performed.
// But given that these are very small values which are essentially a newtype
// wrappers around `Duration` (`NextChunkPruning` and `NextPoVPruning`) and also the
......@@ -495,25 +510,21 @@ where
select! {
incoming = ctx.recv().fuse() => {
match incoming {
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => break,
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
match incoming? {
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(true),
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate { activated, .. })
)) => {
) => {
for activated in activated.into_iter() {
process_block_activated(&mut ctx, &subsystem.inner, activated).await?;
process_block_activated(ctx, &subsystem.inner, activated).await?;
}
}
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(hash))) => {
process_block_finalized(&subsystem, &mut ctx, &subsystem.inner, hash).await?;
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => {
process_block_finalized(subsystem, ctx, &subsystem.inner, hash).await?;
}
Ok(FromOverseer::Communication { msg }) => {
process_message(&mut subsystem, &mut ctx, msg).await?;
FromOverseer::Communication { msg } => {
process_message(subsystem, ctx, msg).await?;
}
Err(e) => {
log::error!("AvailabilityStoreSubsystem err: {:#?}", e);
break
},
}
}
pov_pruning_time = pov_pruning_time => {
......@@ -522,11 +533,10 @@ where
chunk_pruning_time = chunk_pruning_time => {
subsystem.prune_chunks()?;
}
complete => break,
}
complete => return Ok(true),
}
Ok(())
Ok(false)
}
/// As soon as certain block is finalized its pruning records and records of all
......@@ -647,7 +657,7 @@ where
async fn request_candidate_events<Context>(
ctx: &mut Context,
hash: Hash,
) -> Result<Vec<CandidateEvent>, NonFatalError>
) -> Result<Vec<CandidateEvent>, Error>
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
......@@ -673,48 +683,44 @@ where
{
use AvailabilityStoreMessage::*;
fn log_send_error(request: &'static str) {
log::debug!(target: LOG_TARGET, "error sending a response to {}", request);
}
match msg {
QueryAvailableData(hash, tx) => {
tx.send(available_data(&subsystem.inner, &hash).map(|d| d.data))
.unwrap_or_else(|_| log_send_error("QueryAvailableData"));
.map_err(|_| oneshot::Canceled)?;
}
QueryDataAvailability(hash, tx) => {
tx.send(available_data(&subsystem.inner, &hash).is_some())
.unwrap_or_else(|_| log_send_error("QueryDataAvailability"));
.map_err(|_| oneshot::Canceled)?;
}
QueryChunk(hash, id, tx) => {
tx.send(get_chunk(subsystem, &hash, id)?)
.unwrap_or_else(|_| log_send_error("QueryChunk"));
.map_err(|_| oneshot::Canceled)?;
}
QueryChunkAvailability(hash, id, tx) => {
tx.send(get_chunk(subsystem, &hash, id)?.is_some())
.unwrap_or_else(|_| log_send_error("QueryChunkAvailability"));
.map_err(|_| oneshot::Canceled)?;
}
StoreChunk { candidate_hash, relay_parent, validator_index, chunk, tx } => {
// Current block number is relay_parent block number + 1.
let block_number = get_block_number(ctx, relay_parent).await? + 1;
match store_chunk(subsystem, &candidate_hash, validator_index, chunk, block_number) {
Err(e) => {
tx.send(Err(())).unwrap_or_else(|_| log_send_error("StoreChunk (Err)"));
tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
return Err(e);
}
Ok(()) => {
tx.send(Ok(())).unwrap_or_else(|_| log_send_error("StoreChunk (Ok)"));
tx.send(Ok(())).map_err(|_| oneshot::Canceled)?;
}
}
}
StoreAvailableData(hash, id, n_validators, av_data, tx) => {
match store_available_data(subsystem, &hash, id, n_validators, av_data) {
Err(e) => {
tx.send(Err(())).unwrap_or_else(|_| log_send_error("StoreAvailableData (Err)"));
tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
return Err(e);
}
Ok(()) => {
tx.send(Ok(())).unwrap_or_else(|_| log_send_error("StoreAvailableData (Ok)"));
tx.send(Ok(())).map_err(|_| oneshot::Canceled)?;
}
}
}
......@@ -995,7 +1001,7 @@ where
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run(self, ctx)
.map_err(|e| SubsystemError::with_origin("availability-store", e))
.map(|_| Ok(()))
.boxed();
SpawnedSubsystem {
......
......@@ -30,7 +30,9 @@ use polkadot_primitives::v1::{
PersistedValidationData, PoV, Id as ParaId,
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem::ActiveLeavesUpdate;
use polkadot_subsystem::{
ActiveLeavesUpdate, errors::RuntimeApiError,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
struct TestHarness {
......@@ -167,6 +169,51 @@ async fn overseer_signal(
.expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT));
}
#[test]
fn runtime_api_error_does_not_stop_the_subsystem() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
test_harness(PruningConfig::default(), store, |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
let new_leaf = Hash::repeat_byte(0x01);
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf.clone()],
deactivated: smallvec![],
}),
).await;
// runtime api call fails
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
assert_eq!(relay_parent, new_leaf);
tx.send(Err(RuntimeApiError::from("oh no".to_string()))).unwrap();
}
);
// but that's fine, we're still alive
let (tx, rx) = oneshot::channel();
let candidate_hash = Hash::repeat_byte(33);
let validator_index = 5;
let query_chunk = AvailabilityStoreMessage::QueryChunk(
candidate_hash,
validator_index,
tx,
);
overseer_send(&mut virtual_overseer, query_chunk.into()).await;
assert!(rx.await.unwrap().is_none());
});
}
#[test]
fn store_chunk_works() {
let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment