Unverified Commit 8d8d2a6e authored by Lldenaurois's avatar Lldenaurois Committed by GitHub
Browse files

Approval Checking: Avoid redundant checks (#3306)

* node/approval-voting: Introduce LruCache for pending Approval work

This commit adds an LruCache that is intended to track the approval work
submitted as background tasks in order to ensure that the validator needn't
launch duplicate approval work for the same candidate across multiple blocks.
A simple state machine is also introduced in order to differentiate pending
and completed tasks. In addition, this LruCache will retain ValidationResults
from the completed approval work once the task has completed. As per LruCache
implementation, the oldest tasks will get evicted as new approval work is
submitted to this cache.

* node/approval-voting: Revert changes to master

This commit reverts changes from the previous commit in order
to simplify addressing the architecture discussion raised in the PR.

* node/approval-voting: remove background task mpsc construct

This diff removes the mpsc construct for background tasks in preparation
for a move to leveraging RemoteHandles to launch approvals, rather than
passing ApprovalRequests to a mpsc channel and handling the ApprovalRequests
in the main subsystem task.

* node/approval-voting: Introduce LRU Cache

This commit introduces an LRU Cache but does not yet make use of it.

* node/approval-voting: Remove BackgroundTasksMap and memoize currently_checking

This commit removes the BackgroundTasksMap in the main subsystem task
and introduces a method to keep track of RemoteHandles in such a way that
we can ensure that a task is spawned once for a CandidateHash and
relay parent tuple.

* node/approval-voting: Remove BackgroundTasksMap and memoize currently_checking

This commit removes the BackgroundTasksMap in the main subsystem task
and introduces a map of FuturesUnordered per BlockNumber. In addition,
a FusedFuture is generated by iterating across all FuturesUnordered for
the BlockNumbers for which at least one candidate has approvals work
running in the subsystem.

* node/approval-voting: Address Rob's comments

This diff removes the prior HashMap<BlockNumber, FuturesUnordered>
construction and instead moves to a simple FuturesUnordered where
all the work is await with Timeout.

* node/approval-voting: Update Cargo.lock

Due to a mismatch in rustc versions

* node/approval-voting: Make use of actions when issuing_approval

This commit fixes a small oversight in the logic of the prior commit.

* node/approval-voting: Address Rob's feedback

* node/approval-voting: Introduce lazy launch_approval evaluation

* node/approval-voting: Send DistibruteApproval message on every LaunchApproval

In addition to fixed the DistributeApproval bug, this commit also
increases the size of the approvals cache and ensures the StaleGuard
is removed when the advantageous approval state is reached.

* node/approval-voting: Address final comments

This commit removes the CandidateIndex from the ApprovalVoteRequest.
Instead, the launch_approval function will compute the candidate_index
from the block entry.

In addition, a comment has been added explaining the difficulty of
issuing approvals in the handle_actions function.

* node/approval-voting: Set timeout to be 120s rather than 2s

* Update Cargo.lock
parent d2e21f32
Pipeline #143746 canceled with stages
in 6 minutes and 51 seconds
......@@ -5976,6 +5976,7 @@ dependencies = [
"futures-timer 3.0.2",
"kvdb",
"kvdb-memorydb",
"lru",
"maplit",
"merlin",
"parity-scale-codec",
......
......@@ -10,6 +10,7 @@ futures-timer = "3.0.2"
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["bit-vec", "derive"] }
tracing = "0.1.26"
bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] }
lru = "0.6"
merlin = "2.0"
schnorrkel = "0.9.1"
kvdb = "0.9.0"
......
......@@ -25,24 +25,25 @@ use polkadot_node_subsystem::{
messages::{
AssignmentCheckError, AssignmentCheckResult, ApprovalCheckError, ApprovalCheckResult,
ApprovalVotingMessage, RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage,
ApprovalDistributionMessage, ValidationFailed, CandidateValidationMessage,
ApprovalDistributionMessage, CandidateValidationMessage,
AvailabilityRecoveryMessage,
},
errors::RecoveryError,
Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
FromOverseer, OverseerSignal,
FromOverseer, OverseerSignal, SubsystemSender,
};
use polkadot_node_subsystem_util::{
TimeoutExt,
metrics::{self, prometheus},
rolling_session_window::RollingSessionWindow,
};
use polkadot_primitives::v1::{
ValidatorIndex, Hash, SessionIndex, SessionInfo, CandidateHash,
CandidateReceipt, BlockNumber, PersistedValidationData,
ValidationCode, CandidateDescriptor, ValidatorPair, ValidatorSignature, ValidatorId,
CandidateReceipt, BlockNumber,
ValidatorPair, ValidatorSignature, ValidatorId,
CandidateIndex, GroupIndex, ApprovalVote,
};
use polkadot_node_primitives::{ValidationResult, PoV};
use polkadot_node_primitives::ValidationResult;
use polkadot_node_primitives::approval::{
IndirectAssignmentCert, IndirectSignedApprovalVote, DelayTranche, BlockApprovalMeta,
};
......@@ -55,12 +56,14 @@ use sp_application_crypto::Pair;
use kvdb::KeyValueDB;
use futures::prelude::*;
use futures::future::RemoteHandle;
use futures::channel::{mpsc, oneshot};
use futures::future::{BoxFuture, RemoteHandle};
use futures::channel::oneshot;
use futures::stream::FuturesUnordered;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::btree_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use approval_checking::RequiredTranches;
use persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry};
......@@ -80,6 +83,8 @@ use crate::approval_db::v1::Config as DatabaseConfig;
mod tests;
const APPROVAL_SESSIONS: SessionIndex = 6;
const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
const APPROVAL_CACHE_SIZE: usize = 1024;
const LOG_TARGET: &str = "parachain::approval-voting";
/// Configuration for the approval voting subsystem
......@@ -341,21 +346,10 @@ impl<C> Subsystem<C> for ApprovalVotingSubsystem
}
}
enum BackgroundRequest {
ApprovalVote(ApprovalVoteRequest),
CandidateValidation(
PersistedValidationData,
ValidationCode,
CandidateDescriptor,
Arc<PoV>,
oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
),
}
#[derive(Debug, Clone)]
struct ApprovalVoteRequest {
validator_index: ValidatorIndex,
block_hash: Hash,
candidate_index: usize,
}
#[derive(Default)]
......@@ -539,6 +533,109 @@ struct ApprovalStatus {
block_tick: Tick,
}
#[derive(Copy, Clone)]
enum ApprovalOutcome {
Approved,
Failed,
TimedOut,
}
struct ApprovalState {
validator_index: ValidatorIndex,
candidate_hash: CandidateHash,
approval_outcome: ApprovalOutcome,
}
impl ApprovalState {
fn approved(
validator_index: ValidatorIndex,
candidate_hash: CandidateHash,
) -> Self {
Self {
validator_index,
candidate_hash,
approval_outcome: ApprovalOutcome::Approved,
}
}
fn failed(
validator_index: ValidatorIndex,
candidate_hash: CandidateHash,
) -> Self {
Self {
validator_index,
candidate_hash,
approval_outcome: ApprovalOutcome::Failed,
}
}
}
struct CurrentlyCheckingSet {
candidate_hash_map: HashMap<CandidateHash, Vec<Hash>>,
currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
}
impl Default for CurrentlyCheckingSet {
fn default() -> Self {
Self {
candidate_hash_map: HashMap::new(),
currently_checking: FuturesUnordered::new(),
}
}
}
impl CurrentlyCheckingSet {
// This function will lazily launch approval voting work whenever the
// candidate is not already undergoing validation.
pub async fn insert_relay_block_hash(
&mut self,
candidate_hash: CandidateHash,
validator_index: ValidatorIndex,
relay_block: Hash,
launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
) -> SubsystemResult<()> {
let val = self.candidate_hash_map
.entry(candidate_hash)
.or_insert(Default::default());
if let Err(k) = val.binary_search_by_key(&relay_block, |v| *v) {
let _ = val.insert(k, relay_block);
let work = launch_work.await?;
self.currently_checking.push(
Box::pin(async move {
match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
None => ApprovalState {
candidate_hash,
validator_index,
approval_outcome: ApprovalOutcome::TimedOut,
},
Some(approval_state) => approval_state,
}
})
);
}
Ok(())
}
pub async fn next(
&mut self,
approvals_cache: &mut lru::LruCache<CandidateHash, ApprovalOutcome>,
) -> (Vec<Hash>, ApprovalState) {
if !self.currently_checking.is_empty() {
if let Some(approval_state) = self.currently_checking
.next()
.await
{
let out = self.candidate_hash_map.remove(&approval_state.candidate_hash).unwrap_or_default();
approvals_cache.put(approval_state.candidate_hash.clone(), approval_state.approval_outcome.clone());
return (out, approval_state);
}
}
future::pending().await
}
}
struct State<T> {
session_window: RollingSessionWindow,
keystore: Arc<LocalKeystore>,
......@@ -600,7 +697,7 @@ impl<T> State<T> {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
enum Action {
ScheduleWakeup {
block_hash: Hash,
......@@ -611,20 +708,20 @@ enum Action {
WriteBlockEntry(BlockEntry),
WriteCandidateEntry(CandidateHash, CandidateEntry),
LaunchApproval {
candidate_hash: CandidateHash,
indirect_cert: IndirectAssignmentCert,
assignment_tranche: DelayTranche,
relay_block_number: BlockNumber,
relay_block_hash: Hash,
candidate_index: CandidateIndex,
session: SessionIndex,
candidate: CandidateReceipt,
backing_group: GroupIndex,
},
IssueApproval(CandidateHash, ApprovalVoteRequest),
BecomeActive,
Conclude,
}
type BackgroundTaskMap = BTreeMap<BlockNumber, Vec<RemoteHandle<()>>>;
async fn run<C>(
mut ctx: C,
mut subsystem: ApprovalVotingSubsystem,
......@@ -633,7 +730,6 @@ async fn run<C>(
) -> SubsystemResult<()>
where C: SubsystemContext<Message = ApprovalVotingMessage>
{
let (background_tx, background_rx) = mpsc::channel::<BackgroundRequest>(64);
let mut state = State {
session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
keystore: subsystem.keystore,
......@@ -644,12 +740,10 @@ async fn run<C>(
};
let mut wakeups = Wakeups::default();
// map block numbers to background work.
let mut background_tasks = BTreeMap::new();
let mut currently_checking_set = CurrentlyCheckingSet::default();
let mut approvals_cache = lru::LruCache::new(APPROVAL_CACHE_SIZE);
let mut last_finalized_height: Option<BlockNumber> = None;
let mut background_rx = background_rx.fuse();
let db_writer = &*subsystem.db;
......@@ -676,10 +770,6 @@ async fn run<C>(
&mut wakeups,
).await?;
if let Some(finalized_height) = last_finalized_height {
cleanup_background_tasks(finalized_height, &mut background_tasks);
}
if let Mode::Syncing(ref mut oracle) = subsystem.mode {
if !oracle.is_major_syncing() {
// note that we're active before processing other actions.
......@@ -689,28 +779,46 @@ async fn run<C>(
actions
}
background_request = background_rx.next().fuse() => {
if let Some(req) = background_request {
handle_background_request(
&mut ctx,
&mut state,
&subsystem.metrics,
req,
).await?
} else {
Vec::new()
approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => {
let mut actions = Vec::new();
let (
relay_block_hashes,
ApprovalState {
validator_index,
candidate_hash,
approval_outcome,
}
) = approval_state;
if matches!(approval_outcome, ApprovalOutcome::Approved) {
let mut approvals: Vec<Action> = relay_block_hashes
.into_iter()
.map(|block_hash|
Action::IssueApproval(
candidate_hash,
ApprovalVoteRequest {
validator_index,
block_hash,
},
)
)
.collect();
actions.append(&mut approvals);
}
actions
}
};
if handle_actions(
&mut ctx,
&mut state,
&subsystem.metrics,
&mut wakeups,
&mut currently_checking_set,
&mut approvals_cache,
db_writer,
subsystem.db_config,
&background_tx,
&mut background_tasks,
&mut subsystem.mode,
actions,
).await? {
......@@ -721,22 +829,42 @@ async fn run<C>(
Ok(())
}
// Handle actions is a function that accepts a set of instructions
// and subsequently updates the underlying approvals_db in accordance
// with the linear set of instructions passed in. Therefore, actions
// must be processed in series to ensure that earlier actions are not
// negated/corrupted by later actions being executed out-of-order.
//
// However, certain Actions can cause additional actions to need to be
// processed by this function. In order to preserve linearity, we would
// need to handle these newly generated actions before we finalize
// completing additional actions in the submitted sequence of actions.
//
// Since recursive async functions are not not stable yet, we are
// forced to modify the actions iterator on the fly whenever a new set
// of actions are generated by handling a single action.
//
// This particular problem statement is specified in issue 3311:
// https://github.com/paritytech/polkadot/issues/3311
//
// returns `true` if any of the actions was a `Conclude` command.
async fn handle_actions(
ctx: &mut impl SubsystemContext,
state: &mut State<impl DBReader>,
metrics: &Metrics,
wakeups: &mut Wakeups,
currently_checking_set: &mut CurrentlyCheckingSet,
approvals_cache: &mut lru::LruCache<CandidateHash, ApprovalOutcome>,
db: &dyn KeyValueDB,
db_config: DatabaseConfig,
background_tx: &mpsc::Sender<BackgroundRequest>,
background_tasks: &mut BackgroundTaskMap,
mode: &mut Mode,
actions: impl IntoIterator<Item = Action>,
actions: Vec<Action>,
) -> SubsystemResult<bool> {
let mut transaction = approval_db::v1::Transaction::new(db_config);
let mut conclude = false;
for action in actions {
let mut actions_iter = actions.into_iter();
while let Some(action) = actions_iter.next() {
match action {
Action::ScheduleWakeup {
block_hash,
......@@ -752,10 +880,33 @@ async fn handle_actions(
Action::WriteCandidateEntry(candidate_hash, candidate_entry) => {
transaction.put_candidate_entry(candidate_hash, candidate_entry.into());
}
Action::IssueApproval(candidate_hash, approval_request) => {
let mut sender = ctx.sender().clone();
// Note that the IssueApproval action will create additional
// actions that will need to all be processed before we can
// handle the next action in the set passed to the ambient
// function.
//
// In order to achieve this, we append the existing iterator
// to the end of the iterator made up of these newly generated
// actions.
//
// Note that chaining these iterators is O(n) as we must consume
// the prior iterator.
let next_actions: Vec<Action> = issue_approval(
&mut sender,
state,
metrics,
candidate_hash,
approval_request,
)?.into_iter().map(|v| v.clone()).chain(actions_iter).collect();
actions_iter = next_actions.into_iter();
}
Action::LaunchApproval {
candidate_hash,
indirect_cert,
assignment_tranche,
relay_block_number,
relay_block_hash,
candidate_index,
session,
candidate,
......@@ -773,20 +924,42 @@ async fn handle_actions(
candidate_index,
).into());
let handle = launch_approval(
ctx,
metrics.clone(),
background_tx.clone(),
session,
&candidate,
validator_index,
block_hash,
candidate_index as _,
backing_group,
).await?;
if let Some(handle) = handle {
background_tasks.entry(relay_block_number).or_default().push(handle);
match approvals_cache.get(&candidate_hash) {
Some(ApprovalOutcome::Approved) => {
let new_actions: Vec<Action> = std::iter::once(
Action::IssueApproval(
candidate_hash,
ApprovalVoteRequest {
validator_index,
block_hash,
}
)
)
.map(|v| v.clone())
.chain(actions_iter)
.collect();
actions_iter = new_actions.into_iter();
},
None => {
let ctx = &mut *ctx;
currently_checking_set.insert_relay_block_hash(
candidate_hash,
validator_index,
relay_block_hash,
async move {
launch_approval(
ctx,
metrics.clone(),
session,
candidate,
validator_index,
block_hash,
backing_group,
).await
}
).await?;
}
Some(_) => {},
}
}
Action::BecomeActive => {
......@@ -812,19 +985,6 @@ async fn handle_actions(
Ok(conclude)
}
// Clean up all background tasks which are no longer needed as they correspond to a
// finalized block.
fn cleanup_background_tasks(
current_finalized_block: BlockNumber,
tasks: &mut BackgroundTaskMap,
) {
let after = tasks.split_off(&(current_finalized_block + 1));
*tasks = after;
// tasks up to the finalized block are dropped, and `RemoteHandle` cancels
// the task on drop.
}
fn distribution_messages_for_activation<'a>(
db: impl DBReader + 'a,
) -> SubsystemResult<Vec<ApprovalDistributionMessage>> {
......@@ -1037,36 +1197,6 @@ async fn handle_from_overseer(
Ok(actions)
}
async fn handle_background_request(
ctx: &mut impl SubsystemContext,
state: &State<impl DBReader>,
metrics: &Metrics,
request: BackgroundRequest,
) -> SubsystemResult<Vec<Action>> {
match request {
BackgroundRequest::ApprovalVote(vote_request) => {
issue_approval(ctx, state, metrics, vote_request)
}
BackgroundRequest::CandidateValidation(
validation_data,
validation_code,
descriptor,
pov,
tx,
) => {
ctx.send_message(CandidateValidationMessage::ValidateFromExhaustive(
validation_data,
validation_code,
descriptor,
pov,
tx,
).into()).await;
Ok(Vec::new())
}
}
}
async fn handle_approved_ancestor(
ctx: &mut impl SubsystemContext,
db: &impl DBReader,
......@@ -1882,9 +2012,10 @@ fn process_wakeup(
// sanity: should always be present.
actions.push(Action::LaunchApproval {
candidate_hash,
indirect_cert,
assignment_tranche: tranche,
relay_block_number: block_entry.block_number(),
relay_block_hash: relay_block,
candidate_index: i as _,
session: block_entry.session(),
candidate: candidate_entry.candidate_receipt().clone(),
......@@ -1925,14 +2056,12 @@ fn process_wakeup(
async fn launch_approval(
ctx: &mut impl SubsystemContext,
metrics: Metrics,
mut background_tx: mpsc::Sender<BackgroundRequest>,
session_index: SessionIndex,
candidate: &CandidateReceipt,
candidate: CandidateReceipt,
validator_index: ValidatorIndex,
block_hash: Hash,
candidate_index: usize,
backing_group: GroupIndex,
) -> SubsystemResult<Option<RemoteHandle<()>>> {
) -> SubsystemResult<RemoteHandle<ApprovalState>> {
let (a_tx, a_rx) = oneshot::channel();
let (code_tx, code_rx) = oneshot::channel();
......@@ -1988,6 +2117,7 @@ async fn launch_approval(
let candidate = candidate.clone();
let metrics_guard = StaleGuard(Some(metrics));
let mut sender = ctx.sender().clone();
let background = async move {
// Force the move of the timer into the background task.
let _timer = timer;
......@@ -1997,35 +2127,52 @@ async fn launch_approval(
.with_stage(jaeger::Stage::ApprovalChecking);
let available_data = match a_rx.await {
Err(_) => return,
Err(_) => return ApprovalState::failed(
validator_index,
candidate_hash,
),
Ok(Ok(a)) => a,
Ok(Err(RecoveryError::Unavailable)) => {
tracing::warn!(
target: LOG_TARGET,
"Data unavailable for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
// do nothing. we'll just be a no-show and that'll cause others to rise up.
metrics_guard.take().on_approval_unavailable();
return;
}
Ok(Err(RecoveryError::Invalid)) => {
tracing::warn!(
target: LOG_TARGET,
"Data recovery invalid for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
Ok(Err(e)) => {
match &e {
&RecoveryError::Unavailable => {
tracing::warn!(
target: LOG_TARGET,
"Data unavailable for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
// do nothing. we'll just be a no-show and that'll cause others to rise up.
metrics_guard.take().on_approval_unavailable();
}
&RecoveryError::Invalid => {
tracing::warn!(
target: LOG_TARGET,
"Data recovery invalid for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id),
);
// TODO: dispute. Either the merkle trie is bad or the erasure root is.
// https://github.com/paritytech/polkadot/issues/2176
metrics_guard.take().on_approval_invalid();
return;
// TODO: dispute. Either the merkle trie is bad or the erasure root is.
// https://github.com/paritytech/polkadot/issues/2176
metrics_guard.take().on_approval_invalid();
}
}
return ApprovalState::failed(
validator_index,
candidate_hash,
);