Unverified Commit e488c6cd authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Simplify the bitfield signing job (#1920)

Besides that the pr also adds a simple test.
parent c360efa4
Pipeline #113264 passed with stages
in 24 minutes and 49 seconds
...@@ -5000,7 +5000,7 @@ dependencies = [ ...@@ -5000,7 +5000,7 @@ dependencies = [
name = "polkadot-node-core-bitfield-signing" name = "polkadot-node-core-bitfield-signing"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bitvec", "derive_more",
"futures 0.3.5", "futures 0.3.5",
"log 0.4.11", "log 0.4.11",
"polkadot-node-subsystem", "polkadot-node-subsystem",
......
...@@ -5,7 +5,6 @@ authors = ["Parity Technologies <admin@parity.io>"] ...@@ -5,7 +5,6 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
bitvec = "0.17.4"
futures = "0.3.5" futures = "0.3.5"
log = "0.4.11" log = "0.4.11"
polkadot-primitives = { path = "../../../primitives" } polkadot-primitives = { path = "../../../primitives" }
...@@ -14,3 +13,4 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" } ...@@ -14,3 +13,4 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
wasm-timer = "0.2.4" wasm-timer = "0.2.4"
thiserror = "1.0.21" thiserror = "1.0.21"
derive_more = "0.99.11"
...@@ -16,20 +16,16 @@ ...@@ -16,20 +16,16 @@
//! The bitfield signing subsystem produces `SignedAvailabilityBitfield`s once per block. //! The bitfield signing subsystem produces `SignedAvailabilityBitfield`s once per block.
#![deny(unused_crate_dependencies, unused_results)] #![deny(unused_crate_dependencies)]
#![warn(missing_docs)] #![warn(missing_docs)]
#![recursion_limit="256"]
use bitvec::bitvec; use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future};
use futures::{
channel::{mpsc, oneshot},
prelude::*,
stream, Future,
};
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr}; use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
messages::{ messages::{
self, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest,
}, },
errors::RuntimeApiError, errors::RuntimeApiError,
}; };
...@@ -38,7 +34,7 @@ use polkadot_node_subsystem_util::{ ...@@ -38,7 +34,7 @@ use polkadot_node_subsystem_util::{
metrics::{self, prometheus}, metrics::{self, prometheus},
}; };
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex}; use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use std::{convert::TryFrom, pin::Pin, time::Duration}; use std::{convert::TryFrom, pin::Pin, time::Duration, iter::FromIterator};
use wasm_timer::{Delay, Instant}; use wasm_timer::{Delay, Instant};
use thiserror::Error; use thiserror::Error;
...@@ -85,6 +81,7 @@ impl From<BitfieldSigningMessage> for ToJob { ...@@ -85,6 +81,7 @@ impl From<BitfieldSigningMessage> for ToJob {
/// Messages which may be sent from a `BitfieldSigningJob`. /// Messages which may be sent from a `BitfieldSigningJob`.
#[allow(missing_docs)] #[allow(missing_docs)]
#[derive(Debug, derive_more::From)]
pub enum FromJob { pub enum FromJob {
AvailabilityStore(AvailabilityStoreMessage), AvailabilityStore(AvailabilityStoreMessage),
BitfieldDistribution(BitfieldDistributionMessage), BitfieldDistribution(BitfieldDistributionMessage),
...@@ -132,9 +129,6 @@ pub enum Error { ...@@ -132,9 +129,6 @@ pub enum Error {
/// a mspc channel failed to send /// a mspc channel failed to send
#[error(transparent)] #[error(transparent)]
MpscSend(#[from] mpsc::SendError), MpscSend(#[from] mpsc::SendError),
/// several errors collected into one
#[error("Multiple errours occured: {0:?}")]
Multiple(Vec<Error>),
/// the runtime API failed to return what we wanted /// the runtime API failed to return what we wanted
#[error(transparent)] #[error(transparent)]
Runtime(#[from] RuntimeApiError), Runtime(#[from] RuntimeApiError),
...@@ -143,31 +137,25 @@ pub enum Error { ...@@ -143,31 +137,25 @@ pub enum Error {
Keystore(KeystoreError), Keystore(KeystoreError),
} }
// if there is a candidate pending availability, query the Availability Store /// If there is a candidate pending availability, query the Availability Store
// for whether we have the availability chunk for our validator index. /// for whether we have the availability chunk for our validator index.
async fn get_core_availability( async fn get_core_availability(
relay_parent: Hash, relay_parent: Hash,
core: CoreState, core: CoreState,
validator_idx: ValidatorIndex, validator_idx: ValidatorIndex,
sender: &mpsc::Sender<FromJob>, sender: &Mutex<&mut mpsc::Sender<FromJob>>,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
use messages::{
AvailabilityStoreMessage::QueryChunkAvailability,
RuntimeApiRequest::CandidatePendingAvailability,
};
use FromJob::{AvailabilityStore, RuntimeApi};
use RuntimeApiMessage::Request;
// we have to (cheaply) clone this sender so we can mutate it to actually send anything
let mut sender = sender.clone();
if let CoreState::Occupied(core) = core { if let CoreState::Occupied(core) = core {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
sender sender
.send(RuntimeApi(Request( .lock()
relay_parent, .await
CandidatePendingAvailability(core.para_id, tx), .send(
))) RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidatePendingAvailability(core.para_id, tx),
).into(),
)
.await?; .await?;
let committed_candidate_receipt = match rx.await? { let committed_candidate_receipt = match rx.await? {
...@@ -181,27 +169,26 @@ async fn get_core_availability( ...@@ -181,27 +169,26 @@ async fn get_core_availability(
}; };
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
sender sender
.send(AvailabilityStore(QueryChunkAvailability( .lock()
committed_candidate_receipt.descriptor.pov_hash, .await
validator_idx, .send(
tx, AvailabilityStoreMessage::QueryChunkAvailability(
))) committed_candidate_receipt.descriptor.pov_hash,
validator_idx,
tx,
).into(),
)
.await?; .await?;
return rx.await.map_err(Into::into); return rx.await.map_err(Into::into);
} }
Ok(false) Ok(false)
} }
// delegates to the v1 runtime API /// delegates to the v1 runtime API
async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<FromJob>) -> Result<Vec<CoreState>, Error> { async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<FromJob>) -> Result<Vec<CoreState>, Error> {
use FromJob::RuntimeApi;
use messages::{
RuntimeApiMessage::Request,
RuntimeApiRequest::AvailabilityCores,
};
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
sender.send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx)))).await?; sender.send(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx)).into()).await?;
match rx.await { match rx.await {
Ok(Ok(out)) => Ok(out), Ok(Ok(out)) => Ok(out),
Ok(Err(runtime_err)) => Err(runtime_err.into()), Ok(Err(runtime_err)) => Err(runtime_err.into()),
...@@ -209,57 +196,28 @@ async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<Fr ...@@ -209,57 +196,28 @@ async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<Fr
} }
} }
// - get the list of core states from the runtime /// - get the list of core states from the runtime
// - for each core, concurrently determine chunk availability (see `get_core_availability`) /// - for each core, concurrently determine chunk availability (see `get_core_availability`)
// - return the bitfield if there were no errors at any point in this process /// - return the bitfield if there were no errors at any point in this process
// (otherwise, it's prone to false negatives) /// (otherwise, it's prone to false negatives)
async fn construct_availability_bitfield( async fn construct_availability_bitfield(
relay_parent: Hash, relay_parent: Hash,
validator_idx: ValidatorIndex, validator_idx: ValidatorIndex,
sender: &mut mpsc::Sender<FromJob>, sender: &mut mpsc::Sender<FromJob>,
) -> Result<AvailabilityBitfield, Error> { ) -> Result<AvailabilityBitfield, Error> {
use futures::lock::Mutex;
// get the set of availability cores from the runtime // get the set of availability cores from the runtime
let availability_cores = get_availability_cores(relay_parent, sender).await?; let availability_cores = get_availability_cores(relay_parent, sender).await?;
// we now need sender to be immutable so we can copy the reference to multiple concurrent closures // Wrap the sender in a Mutex to share it between the futures.
let sender = &*sender; let sender = Mutex::new(sender);
// prepare outputs
let out = Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len()));
// in principle, we know that we never want concurrent access to the _same_ bit within the vec;
// we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding
// any need to ever wait to lock this mutex.
// in practice, it's safer to just use the mutex, and speed optimizations should wait until
// benchmarking proves that they are necessary.
let out_ref = &out;
let errs = Mutex::new(Vec::new());
let errs_ref = &errs;
// Handle each (idx, core) pair concurrently
//
// In principle, this work is all concurrent, not parallel. In practice, we can't guarantee it, which is why
// we need the mutexes and explicit references above.
stream::iter(availability_cores.into_iter().enumerate())
.for_each_concurrent(None, |(idx, core)| async move {
let availability = match get_core_availability(relay_parent, core, validator_idx, sender).await {
Ok(availability) => availability,
Err(err) => {
errs_ref.lock().await.push(err);
return;
}
};
out_ref.lock().await.set(idx, availability);
})
.await;
let errs = errs.into_inner(); // Handle all cores concurrently
if errs.is_empty() { // `try_join_all` returns all results in the same order as the input futures.
Ok(out.into_inner().into()) let results = future::try_join_all(
} else { availability_cores.into_iter().map(|core| get_core_availability(relay_parent, core, validator_idx, &sender)),
Err(Error::Multiple(errs.into())) ).await?;
}
Ok(AvailabilityBitfield(FromIterator::from_iter(results)))
} }
#[derive(Clone)] #[derive(Clone)]
...@@ -312,7 +270,6 @@ impl JobTrait for BitfieldSigningJob { ...@@ -312,7 +270,6 @@ impl JobTrait for BitfieldSigningJob {
mut sender: mpsc::Sender<FromJob>, mut sender: mpsc::Sender<FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> { ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move { async move {
// figure out when to wait to
let wait_until = Instant::now() + JOB_DELAY; let wait_until = Instant::now() + JOB_DELAY;
// now do all the work we can before we need to wait for the availability store // now do all the work we can before we need to wait for the availability store
...@@ -344,24 +301,83 @@ impl JobTrait for BitfieldSigningJob { ...@@ -344,24 +301,83 @@ impl JobTrait for BitfieldSigningJob {
.map_err(|e| Error::Keystore(e))?; .map_err(|e| Error::Keystore(e))?;
metrics.on_bitfield_signed(); metrics.on_bitfield_signed();
// make an anonymous scope to contain some use statements to simplify creating the outbound message sender
{ .send(BitfieldDistributionMessage::DistributeBitfield(relay_parent, signed_bitfield).into())
use BitfieldDistributionMessage::DistributeBitfield; .await
use FromJob::BitfieldDistribution; .map_err(Into::into)
sender
.send(BitfieldDistribution(DistributeBitfield(
relay_parent,
signed_bitfield,
)))
.await
.map_err(Into::into)
}
} }
.boxed() .boxed()
} }
} }
/// BitfieldSigningSubsystem manages a number of bitfield signing jobs. /// BitfieldSigningSubsystem manages a number of bitfield signing jobs.
pub type BitfieldSigningSubsystem<Spawner, Context> = pub type BitfieldSigningSubsystem<Spawner, Context> = JobManager<Spawner, Context, BitfieldSigningJob>;
JobManager<Spawner, Context, BitfieldSigningJob>;
#[cfg(test)]
mod tests {
use super::*;
use futures::{pin_mut, executor::block_on};
use polkadot_primitives::v1::{OccupiedCore};
use FromJob::*;
fn occupied_core(para_id: u32) -> CoreState {
CoreState::Occupied(OccupiedCore {
para_id: para_id.into(),
group_responsible: para_id.into(),
next_up_on_available: None,
occupied_since: 100_u32,
time_out_at: 200_u32,
next_up_on_time_out: None,
availability: Default::default(),
})
}
#[test]
fn construct_availability_bitfield_works() {
block_on(async move {
let (mut sender, mut receiver) = mpsc::channel(10);
let relay_parent = Hash::default();
let validator_index = 1u32;
let future = construct_availability_bitfield(relay_parent, validator_index, &mut sender).fuse();
pin_mut!(future);
loop {
futures::select! {
m = receiver.next() => match m.unwrap() {
RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx))) => {
assert_eq!(relay_parent, rp);
tx.send(Ok(vec![CoreState::Free, occupied_core(1), occupied_core(2)])).unwrap();
},
RuntimeApi(
RuntimeApiMessage::Request(rp, RuntimeApiRequest::CandidatePendingAvailability(para_id, tx))
) => {
assert_eq!(relay_parent, rp);
if para_id == 1.into() {
tx.send(Ok(Some(Default::default()))).unwrap();
} else {
tx.send(Ok(None)).unwrap();
}
},
AvailabilityStore(AvailabilityStoreMessage::QueryChunkAvailability(_, vidx, tx)) => {
assert_eq!(validator_index, vidx);
tx.send(true).unwrap();
},
o => panic!("Unknown message: {:?}", o),
},
r = future => match r {
Ok(r) => {
assert!(!r.0.get(0).unwrap());
assert!(r.0.get(1).unwrap());
assert!(!r.0.get(2).unwrap());
break
},
Err(e) => panic!("Failed: {:?}", e),
},
}
}
});
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment