Unverified Commit 80f4349e authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Some overdue cleanup (#2989)

* Cleanup obsolete code.

* Move session cache to requester.
parent e759da3a
Pipeline #137205 failed with stages
in 26 minutes and 3 seconds
......@@ -18,13 +18,12 @@
//! Error handling related code and Error/Result definitions.
use polkadot_node_network_protocol::request_response::request::RequestError;
use polkadot_primitives::v1::SessionIndex;
use thiserror::Error;
use futures::channel::oneshot;
use polkadot_node_subsystem_util::{Fault, Error as UtilError, runtime, unwrap_non_fatal};
use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};
use polkadot_node_subsystem_util::{Fault, runtime, unwrap_non_fatal};
use polkadot_subsystem::SubsystemError;
use crate::LOG_TARGET;
......@@ -57,10 +56,6 @@ pub enum Fatal {
#[error("Spawning subsystem task failed")]
SpawnTask(#[source] SubsystemError),
/// Runtime API subsystem is down, which means we're shutting down.
#[error("Runtime request canceled")]
RuntimeRequestCanceled(oneshot::Canceled),
/// Requester stream exhausted.
#[error("Erasure chunk requester stream exhausted")]
RequesterExhausted,
......@@ -88,24 +83,10 @@ pub enum NonFatal {
#[error("Session is not cached.")]
NoSuchCachedSession,
/// We tried reporting bad validators, although we are not a validator ourselves.
#[error("Not a validator.")]
NotAValidator,
/// Sending request response failed (Can happen on timeouts for example).
#[error("Sending a request's response failed.")]
SendResponse,
/// Some request to utility functions failed.
/// This can be either `RuntimeRequestCanceled` or `RuntimeApiError`.
#[error("Utility request failed")]
UtilRequest(UtilError),
/// Some request to the runtime failed.
/// For example if we prune a block we're requesting info about.
#[error("Runtime API error")]
RuntimeRequest(RuntimeApiError),
/// Fetching PoV failed with `RequestError`.
#[error("FetchPoV request error")]
FetchPoV(#[source] RequestError),
......@@ -121,10 +102,6 @@ pub enum NonFatal {
#[error("Given validator index could not be found")]
InvalidValidatorIndex,
/// We tried fetching a session info which was not available.
#[error("There was no session with the given index")]
NoSuchSession(SessionIndex),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::NonFatal),
......@@ -144,13 +121,3 @@ pub fn log_error(result: Result<()>, ctx: &'static str)
}
Ok(())
}
/// Receive a response from a runtime request and convert errors.
pub(crate) async fn recv_runtime<V>(
r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
) -> Result<V> {
let result = r.await
.map_err(Fatal::RuntimeRequestCanceled)?
.map_err(NonFatal::RuntimeRequest)?;
Ok(result)
}
......@@ -25,7 +25,7 @@ use polkadot_subsystem::{
/// Error and [`Result`] type for this subsystem.
mod error;
pub use error::{Fatal, NonFatal};
use error::Fatal;
use error::{Result, log_error};
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
......@@ -42,9 +42,6 @@ use pov_requester::PoVRequester;
mod responder;
use responder::{answer_chunk_request_log, answer_pov_request_log};
/// Cache for session information.
mod session_cache;
mod metrics;
/// Prometheus `Metrics` for availability distribution.
pub use metrics::Metrics;
......@@ -56,8 +53,6 @@ const LOG_TARGET: &'static str = "parachain::availability-distribution";
/// The availability distribution subsystem.
pub struct AvailabilityDistributionSubsystem {
/// Pointer to a keystore, which is required for determining this nodes validator index.
keystore: SyncCryptoStorePtr,
/// Easy and efficient runtime access for this subsystem.
runtime: RuntimeInfo,
/// Prometheus metrics.
......@@ -85,8 +80,8 @@ impl AvailabilityDistributionSubsystem {
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
let runtime = RuntimeInfo::new(Some(keystore.clone()));
Self { keystore, runtime, metrics }
let runtime = RuntimeInfo::new(Some(keystore));
Self { runtime, metrics }
}
/// Start processing work as passed on from the Overseer.
......@@ -94,7 +89,7 @@ impl AvailabilityDistributionSubsystem {
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
{
let mut requester = Requester::new(self.keystore.clone(), self.metrics.clone()).fuse();
let mut requester = Requester::new(self.metrics.clone()).fuse();
let mut pov_requester = PoVRequester::new();
loop {
let action = {
......@@ -131,7 +126,7 @@ impl AvailabilityDistributionSubsystem {
);
}
log_error(
requester.get_mut().update_fetching_heads(&mut ctx, update).await,
requester.get_mut().update_fetching_heads(&mut ctx, &mut self.runtime, update).await,
"Error in Requester::update_fetching_heads"
)?;
}
......
......@@ -35,7 +35,7 @@ use polkadot_subsystem::{SubsystemContext, jaeger};
use crate::{
error::{Fatal, Result},
session_cache::{BadValidators, SessionInfo},
requester::session_cache::{BadValidators, SessionInfo},
LOG_TARGET,
metrics::{Metrics, SUCCEEDED, FAILED},
};
......
......@@ -30,15 +30,17 @@ use futures::{
Stream,
};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_subsystem_util::runtime::get_occupied_cores;
use polkadot_node_subsystem_util::runtime::{RuntimeInfo, get_occupied_cores};
use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore};
use polkadot_subsystem::{
messages::AllMessages, ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
};
use super::{session_cache::SessionCache, LOG_TARGET, Metrics};
use super::{LOG_TARGET, Metrics};
/// Cache for session information.
mod session_cache;
use session_cache::SessionCache;
/// A task fetching a particular chunk.
......@@ -76,12 +78,12 @@ impl Requester {
///
/// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress
/// by advancing the stream.
#[tracing::instrument(level = "trace", skip(keystore, metrics), fields(subsystem = LOG_TARGET))]
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
#[tracing::instrument(level = "trace", skip(metrics), fields(subsystem = LOG_TARGET))]
pub fn new(metrics: Metrics) -> Self {
let (tx, rx) = mpsc::channel(1);
Requester {
fetches: HashMap::new(),
session_cache: SessionCache::new(keystore),
session_cache: SessionCache::new(),
tx,
rx,
metrics,
......@@ -90,10 +92,11 @@ impl Requester {
/// Update heads that need availability distribution.
///
/// For all active heads we will be fetching our chunks for availabilty distribution.
#[tracing::instrument(level = "trace", skip(self, ctx, update), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, ctx, runtime, update), fields(subsystem = LOG_TARGET))]
pub async fn update_fetching_heads<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
update: ActiveLeavesUpdate,
) -> super::Result<()>
where
......@@ -110,7 +113,7 @@ impl Requester {
} = update;
// Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs.
self.start_requesting_chunks(ctx, activated.into_iter()).await?;
self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?;
self.stop_requesting_chunks(deactivated.into_iter());
Ok(())
}
......@@ -119,6 +122,7 @@ impl Requester {
async fn start_requesting_chunks<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
new_heads: impl Iterator<Item = ActivatedLeaf>,
) -> super::Result<()>
where
......@@ -131,7 +135,7 @@ impl Requester {
occupied_cores = ?cores,
"Query occupied core"
);
self.add_cores(ctx, leaf, cores).await?;
self.add_cores(ctx, runtime, leaf, cores).await?;
}
Ok(())
}
......@@ -156,6 +160,7 @@ impl Requester {
async fn add_cores<Context>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
leaf: Hash,
cores: impl IntoIterator<Item = OccupiedCore>,
) -> super::Result<()>
......@@ -177,6 +182,7 @@ impl Requester {
.session_cache
.with_session_info(
ctx,
runtime,
// We use leaf here, as relay_parent must be in the same session as the
// leaf. (Cores are dropped at session boundaries.) At the same time,
// only leaves are guaranteed to be fetchable by the state trie.
......
......@@ -19,47 +19,28 @@ use std::collections::HashSet;
use lru::LruCache;
use rand::{seq::SliceRandom, thread_rng};
use sp_application_crypto::AppKey;
use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_node_subsystem_util::{
request_session_index_for_child, request_session_info,
};
use polkadot_primitives::v1::SessionInfo as GlobalSessionInfo;
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
use polkadot_primitives::v1::{
AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex, ValidatorId, ValidatorIndex,
AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex, ValidatorIndex,
};
use polkadot_subsystem::SubsystemContext;
use super::{
error::{recv_runtime, Error, NonFatal},
use crate::{
error::{Error, NonFatal},
LOG_TARGET,
};
/// Caching of session info as needed by availability distribution.
/// Caching of session info as needed by availability chunk distribution.
///
/// It should be ensured that a cached session stays live in the cache as long as we might need it.
pub struct SessionCache {
/// Get the session index for a given relay parent.
///
/// We query this up to a 100 times per block, so caching it here without roundtrips over the
/// overseer seems sensible.
session_index_cache: LruCache<Hash, SessionIndex>,
/// Look up cached sessions by SessionIndex.
///
/// Note: Performance of fetching is really secondary here, but we need to ensure we are going
/// to get any existing cache entry, before fetching new information, as we should not mess up
/// the order of validators in `SessionInfo::validator_groups`. (We want live TCP connections
/// wherever possible.)
///
/// We store `None` in case we are not a validator, so we won't do needless fetches for non
/// validator nodes.
session_info_cache: LruCache<SessionIndex, Option<SessionInfo>>,
/// Key store for determining whether we are a validator and what `ValidatorIndex` we have.
keystore: SyncCryptoStorePtr,
/// the order of validators in `SessionInfo::validator_groups`.
session_info_cache: LruCache<SessionIndex, SessionInfo>,
}
/// Localized session information, tailored for the needs of availability distribution.
......@@ -101,13 +82,10 @@ pub struct BadValidators {
impl SessionCache {
/// Create a new `SessionCache`.
pub fn new(keystore: SyncCryptoStorePtr) -> Self {
pub fn new() -> Self {
SessionCache {
// 5 relatively conservative, 1 to 2 should suffice:
session_index_cache: LruCache::new(5),
// We need to cache the current and the last session the most:
session_info_cache: LruCache::new(2),
keystore,
}
}
......@@ -117,10 +95,11 @@ impl SessionCache {
///
/// Use this function over any `fetch_session_info` if all you need is a reference to
/// `SessionInfo`, as it avoids an expensive clone.
#[tracing::instrument(level = "trace", skip(self, ctx, with_info), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, ctx, runtime, with_info), fields(subsystem = LOG_TARGET))]
pub async fn with_session_info<Context, F, R>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
parent: Hash,
with_info: F,
) -> Result<Option<R>, Error>
......@@ -128,40 +107,23 @@ impl SessionCache {
Context: SubsystemContext,
F: FnOnce(&SessionInfo) -> R,
{
let session_index = match self.session_index_cache.get(&parent) {
Some(index) => *index,
None => {
let index =
recv_runtime(request_session_index_for_child(parent, ctx.sender()).await)
.await?;
self.session_index_cache.put(parent, index);
index
}
};
let session_index = runtime.get_session_index(ctx, parent).await?;
if let Some(o_info) = self.session_info_cache.get(&session_index) {
tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
if let Some(info) = o_info {
return Ok(Some(with_info(info)));
} else {
// Info was cached - we are not a validator: return early:
return Ok(None)
}
return Ok(Some(with_info(o_info)));
}
if let Some(info) = self
.query_info_from_runtime(ctx, parent, session_index)
.query_info_from_runtime(ctx, runtime, parent, session_index)
.await?
{
tracing::trace!(target: LOG_TARGET, session_index, "Calling `with_info`");
let r = with_info(&info);
tracing::trace!(target: LOG_TARGET, session_index, "Storing session info in lru!");
self.session_info_cache.put(session_index, Some(info));
self.session_info_cache.put(session_index, info);
Ok(Some(r))
} else {
// Avoid needless fetches if we are not a validator:
self.session_info_cache.put(session_index, None);
tracing::trace!(target: LOG_TARGET, session_index, "No session info found!");
Ok(None)
}
}
......@@ -185,13 +147,11 @@ impl SessionCache {
/// We assume validators in a group are tried in reverse order, so the reported bad validators
/// will be put at the beginning of the group.
#[tracing::instrument(level = "trace", skip(self, report), fields(subsystem = LOG_TARGET))]
pub fn report_bad(&mut self, report: BadValidators) -> super::Result<()> {
pub fn report_bad(&mut self, report: BadValidators) -> crate::Result<()> {
let session = self
.session_info_cache
.get_mut(&report.session_index)
.ok_or(NonFatal::NoSuchCachedSession)?
.as_mut()
.ok_or(NonFatal::NotAValidator)?;
.ok_or(NonFatal::NoSuchCachedSession)?;
let group = session
.validator_groups
.get_mut(report.group_index.0 as usize)
......@@ -218,36 +178,21 @@ impl SessionCache {
async fn query_info_from_runtime<Context>(
&self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
parent: Hash,
session_index: SessionIndex,
) -> Result<Option<SessionInfo>, Error>
where
Context: SubsystemContext,
{
let GlobalSessionInfo {
validators,
discovery_keys,
mut validator_groups,
..
} = recv_runtime(request_session_info(parent, session_index, ctx.sender()).await)
.await?
.ok_or(NonFatal::NoSuchSession(session_index))?;
let info = runtime.get_session_info_by_index(ctx, parent, session_index).await?;
if let Some(our_index) = self.get_our_index(validators).await {
let discovery_keys = info.session_info.discovery_keys.clone();
let mut validator_groups = info.session_info.validator_groups.clone();
if let Some(our_index) = info.validator_info.our_index {
// Get our group index:
let our_group = validator_groups
.iter()
.enumerate()
.find_map(|(i, g)| {
g.iter().find_map(|v| {
if *v == our_index {
Some(GroupIndex(i as u32))
} else {
None
}
})
}
);
let our_group = info.validator_info.our_group;
// Shuffle validators in groups:
let mut rng = thread_rng();
......@@ -279,18 +224,4 @@ impl SessionCache {
}
return Ok(None)
}
/// Get our `ValidatorIndex`.
///
/// Returns: None if we are not a validator.
async fn get_our_index(&self, validators: Vec<ValidatorId>) -> Option<ValidatorIndex> {
for (i, v) in validators.iter().enumerate() {
if CryptoStore::has_keys(&*self.keystore, &[(v.to_raw_vec(), ValidatorId::ID)])
.await
{
return Some(ValidatorIndex(i as u32));
}
}
None
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment