Unverified Commit 1fbf09ac authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Simplify subsystem jobs (#2037)

* Simplify subsystem jobs

This pr simplifies the subsystem jobs interface. Instead of requiring an
extra message that is used to signal that a job should be ended, a job
now ends when the receiver returns `None`. Besides that it changes the
interface to enforce that messages to a job provide a relay parent.

* Drop ToJobTrait

* Remove FromJob

We always convert this message to FromJobCommand anyway.
parent efa9b994
Pipeline #115503 passed with stages
in 21 minutes and 21 seconds
......@@ -4940,7 +4940,6 @@ dependencies = [
name = "polkadot-node-core-bitfield-signing"
version = "0.1.0"
dependencies = [
"derive_more",
"futures 0.3.8",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
......
......@@ -40,8 +40,7 @@ use polkadot_subsystem::{
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
RuntimeApiRequest,
ProvisionerMessage, StatementDistributionMessage, ValidationFailed, RuntimeApiRequest,
},
};
use polkadot_node_subsystem_util::{
......@@ -93,9 +92,9 @@ struct CandidateBackingJob {
/// The hash of the relay parent on top of which this job is doing it's work.
parent: Hash,
/// Inbound message channel receiving part.
rx_to: mpsc::Receiver<ToJob>,
rx_to: mpsc::Receiver<CandidateBackingMessage>,
/// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJob>,
tx_from: mpsc::Sender<FromJobCommand>,
/// The `ParaId` assigned to this validator
assignment: ParaId,
/// The collator required to author the candidate, if any.
......@@ -151,84 +150,6 @@ impl TableContextTrait for TableContext {
}
}
/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
pub enum ToJob {
/// A `CandidateBackingMessage`.
CandidateBacking(CandidateBackingMessage),
/// Stop working.
Stop,
}
impl TryFrom<AllMessages> for ToJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::CandidateBacking(msg) => Ok(ToJob::CandidateBacking(msg)),
_ => Err(()),
}
}
}
impl From<CandidateBackingMessage> for ToJob {
fn from(msg: CandidateBackingMessage) -> Self {
Self::CandidateBacking(msg)
}
}
impl util::ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;
fn relay_parent(&self) -> Option<Hash> {
match self {
Self::CandidateBacking(cb) => cb.relay_parent(),
Self::Stop => None,
}
}
}
/// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`.
enum FromJob {
AvailabilityStore(AvailabilityStoreMessage),
RuntimeApiMessage(RuntimeApiMessage),
CandidateValidation(CandidateValidationMessage),
CandidateSelection(CandidateSelectionMessage),
Provisioner(ProvisionerMessage),
PoVDistribution(PoVDistributionMessage),
StatementDistribution(StatementDistributionMessage),
}
impl From<FromJob> for FromJobCommand {
fn from(f: FromJob) -> FromJobCommand {
FromJobCommand::SendMessage(match f {
FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg),
FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg),
FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg),
FromJob::CandidateSelection(msg) => AllMessages::CandidateSelection(msg),
FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg),
FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg),
FromJob::Provisioner(msg) => AllMessages::Provisioner(msg),
})
}
}
impl TryFrom<AllMessages> for FromJob {
type Error = &'static str;
fn try_from(f: AllMessages) -> Result<Self, Self::Error> {
match f {
AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)),
AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)),
AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)),
AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)),
AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)),
AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)),
AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)),
_ => Err("can't convert this AllMessages variant to FromJob"),
}
}
}
struct InvalidErasureRoot;
// It looks like it's not possible to do an `impl From` given the current state of
......@@ -301,12 +222,10 @@ fn table_attested_to_backed(
impl CandidateBackingJob {
/// Run asynchronously.
async fn run_loop(mut self) -> Result<(), Error> {
while let Some(msg) = self.rx_to.next().await {
match msg {
ToJob::CandidateBacking(msg) => {
self.process_msg(msg).await?;
}
ToJob::Stop => break,
loop {
match self.rx_to.next().await {
Some(msg) => self.process_msg(msg).await?,
None => break,
}
}
......@@ -317,9 +236,7 @@ impl CandidateBackingJob {
&mut self,
candidate: CandidateReceipt,
) -> Result<(), Error> {
self.tx_from.send(FromJob::CandidateSelection(
CandidateSelectionMessage::Invalid(self.parent, candidate)
)).await?;
self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Invalid(self.parent, candidate)).into()).await?;
Ok(())
}
......@@ -664,7 +581,7 @@ impl CandidateBackingJob {
}
async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
self.tx_from.send(FromJob::Provisioner(msg)).await?;
self.tx_from.send(AllMessages::from(msg).into()).await?;
Ok(())
}
......@@ -674,9 +591,9 @@ impl CandidateBackingJob {
descriptor: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<(), Error> {
self.tx_from.send(FromJob::PoVDistribution(
self.tx_from.send(AllMessages::from(
PoVDistributionMessage::DistributePoV(self.parent, descriptor, pov),
)).await.map_err(Into::into)
).into()).await.map_err(Into::into)
}
async fn request_pov_from_distribution(
......@@ -685,9 +602,9 @@ impl CandidateBackingJob {
) -> Result<Arc<PoV>, Error> {
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::PoVDistribution(
self.tx_from.send(AllMessages::from(
PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
)).await?;
).into()).await?;
Ok(rx.await?)
}
......@@ -699,13 +616,14 @@ impl CandidateBackingJob {
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::CandidateValidation(
self.tx_from.send(
AllMessages::from(
CandidateValidationMessage::ValidateFromChainState(
candidate,
pov,
tx,
)
)
).into(),
).await?;
Ok(rx.await??)
......@@ -719,7 +637,7 @@ impl CandidateBackingJob {
available_data: AvailableData,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::AvailabilityStore(
self.tx_from.send(AllMessages::from(
AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
id,
......@@ -727,7 +645,7 @@ impl CandidateBackingJob {
available_data,
tx,
)
)
).into(),
).await?;
let _ = rx.await?;
......@@ -777,15 +695,14 @@ impl CandidateBackingJob {
async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
let smsg = StatementDistributionMessage::Share(self.parent, s);
self.tx_from.send(FromJob::StatementDistribution(smsg)).await?;
self.tx_from.send(AllMessages::from(smsg).into()).await?;
Ok(())
}
}
impl util::JobTrait for CandidateBackingJob {
type ToJob = ToJob;
type FromJob = FromJob;
type ToJob = CandidateBackingMessage;
type Error = Error;
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;
......@@ -798,7 +715,7 @@ impl util::JobTrait for CandidateBackingJob {
keystore: SyncCryptoStorePtr,
metrics: Metrics,
rx_to: mpsc::Receiver<Self::ToJob>,
mut tx_from: mpsc::Sender<Self::FromJob>,
mut tx_from: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
macro_rules! try_runtime_api {
......@@ -1000,7 +917,7 @@ impl metrics::Metrics for Metrics {
}
}
delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- ToJob as CandidateBackingSubsystem);
delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- CandidateBackingMessage as CandidateBackingSubsystem);
#[cfg(test)]
mod tests {
......@@ -1013,7 +930,7 @@ mod tests {
GroupRotationInfo,
};
use polkadot_subsystem::{
messages::RuntimeApiRequest,
messages::{RuntimeApiRequest, RuntimeApiMessage},
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
};
use polkadot_node_primitives::InvalidCandidate;
......
......@@ -14,4 +14,3 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
wasm-timer = "0.2.5"
thiserror = "1.0.22"
derive_more = "0.99.11"
......@@ -25,16 +25,15 @@ use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
use polkadot_node_subsystem::{
messages::{
AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest,
BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest,
},
errors::RuntimeApiError,
};
use polkadot_node_subsystem_util::{
self as util, JobManager, JobTrait, ToJobTrait, Validator, FromJobCommand,
metrics::{self, prometheus},
self as util, JobManager, JobTrait, Validator, FromJobCommand, metrics::{self, prometheus},
};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use std::{convert::TryFrom, pin::Pin, time::Duration, iter::FromIterator};
use std::{pin::Pin, time::Duration, iter::FromIterator};
use wasm_timer::{Delay, Instant};
use thiserror::Error;
......@@ -45,76 +44,6 @@ const LOG_TARGET: &str = "bitfield_signing";
/// Each `BitfieldSigningJob` prepares a signed bitfield for a single relay parent.
pub struct BitfieldSigningJob;
/// Messages which a `BitfieldSigningJob` is prepared to receive.
#[allow(missing_docs)]
pub enum ToJob {
BitfieldSigning(BitfieldSigningMessage),
Stop,
}
impl ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;
fn relay_parent(&self) -> Option<Hash> {
match self {
Self::BitfieldSigning(bsm) => bsm.relay_parent(),
Self::Stop => None,
}
}
}
impl TryFrom<AllMessages> for ToJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::BitfieldSigning(bsm) => Ok(ToJob::BitfieldSigning(bsm)),
_ => Err(()),
}
}
}
impl From<BitfieldSigningMessage> for ToJob {
fn from(bsm: BitfieldSigningMessage) -> ToJob {
ToJob::BitfieldSigning(bsm)
}
}
/// Messages which may be sent from a `BitfieldSigningJob`.
#[allow(missing_docs)]
#[derive(Debug, derive_more::From)]
pub enum FromJob {
AvailabilityStore(AvailabilityStoreMessage),
BitfieldDistribution(BitfieldDistributionMessage),
CandidateBacking(CandidateBackingMessage),
RuntimeApi(RuntimeApiMessage),
}
impl From<FromJob> for FromJobCommand {
fn from(from_job: FromJob) -> FromJobCommand {
FromJobCommand::SendMessage(match from_job {
FromJob::AvailabilityStore(asm) => AllMessages::AvailabilityStore(asm),
FromJob::BitfieldDistribution(bdm) => AllMessages::BitfieldDistribution(bdm),
FromJob::CandidateBacking(cbm) => AllMessages::CandidateBacking(cbm),
FromJob::RuntimeApi(ram) => AllMessages::RuntimeApi(ram),
})
}
}
impl TryFrom<AllMessages> for FromJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::AvailabilityStore(asm) => Ok(Self::AvailabilityStore(asm)),
AllMessages::BitfieldDistribution(bdm) => Ok(Self::BitfieldDistribution(bdm)),
AllMessages::CandidateBacking(cbm) => Ok(Self::CandidateBacking(cbm)),
AllMessages::RuntimeApi(ram) => Ok(Self::RuntimeApi(ram)),
_ => Err(()),
}
}
}
/// Errors we may encounter in the course of executing the `BitfieldSigningSubsystem`.
#[derive(Debug, Error)]
pub enum Error {
......@@ -145,7 +74,7 @@ async fn get_core_availability(
relay_parent: Hash,
core: CoreState,
validator_idx: ValidatorIndex,
sender: &Mutex<&mut mpsc::Sender<FromJob>>,
sender: &Mutex<&mut mpsc::Sender<FromJobCommand>>,
) -> Result<bool, Error> {
if let CoreState::Occupied(core) = core {
let (tx, rx) = oneshot::channel();
......@@ -153,10 +82,10 @@ async fn get_core_availability(
.lock()
.await
.send(
RuntimeApiMessage::Request(
AllMessages::from(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidatePendingAvailability(core.para_id, tx),
).into(),
)).into(),
)
.await?;
......@@ -174,11 +103,11 @@ async fn get_core_availability(
.lock()
.await
.send(
AvailabilityStoreMessage::QueryChunkAvailability(
AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability(
committed_candidate_receipt.hash(),
validator_idx,
tx,
).into(),
)).into(),
)
.await?;
return rx.await.map_err(Into::into);
......@@ -188,9 +117,14 @@ async fn get_core_availability(
}
/// delegates to the v1 runtime API
async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<FromJob>) -> Result<Vec<CoreState>, Error> {
async fn get_availability_cores(
relay_parent: Hash,
sender: &mut mpsc::Sender<FromJobCommand>,
) -> Result<Vec<CoreState>, Error> {
let (tx, rx) = oneshot::channel();
sender.send(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx)).into()).await?;
sender
.send(AllMessages::from(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx))).into())
.await?;
match rx.await {
Ok(Ok(out)) => Ok(out),
Ok(Err(runtime_err)) => Err(runtime_err.into()),
......@@ -206,7 +140,7 @@ async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<Fr
async fn construct_availability_bitfield(
relay_parent: Hash,
validator_idx: ValidatorIndex,
sender: &mut mpsc::Sender<FromJob>,
sender: &mut mpsc::Sender<FromJobCommand>,
) -> Result<AvailabilityBitfield, Error> {
// get the set of availability cores from the runtime
let availability_cores = get_availability_cores(relay_parent, sender).await?;
......@@ -275,8 +209,7 @@ impl metrics::Metrics for Metrics {
}
impl JobTrait for BitfieldSigningJob {
type ToJob = ToJob;
type FromJob = FromJob;
type ToJob = BitfieldSigningMessage;
type Error = Error;
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;
......@@ -289,8 +222,8 @@ impl JobTrait for BitfieldSigningJob {
relay_parent: Hash,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<ToJob>,
mut sender: mpsc::Sender<FromJob>,
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
mut sender: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let metrics = metrics.clone();
async move {
......@@ -330,7 +263,11 @@ impl JobTrait for BitfieldSigningJob {
metrics.on_bitfield_signed();
sender
.send(BitfieldDistributionMessage::DistributeBitfield(relay_parent, signed_bitfield).into())
.send(
AllMessages::from(
BitfieldDistributionMessage::DistributeBitfield(relay_parent, signed_bitfield),
).into(),
)
.await
.map_err(Into::into)
}
......@@ -345,8 +282,7 @@ pub type BitfieldSigningSubsystem<Spawner, Context> = JobManager<Spawner, Contex
mod tests {
use super::*;
use futures::{pin_mut, executor::block_on};
use polkadot_primitives::v1::{OccupiedCore};
use FromJob::*;
use polkadot_primitives::v1::OccupiedCore;
fn occupied_core(para_id: u32) -> CoreState {
CoreState::Occupied(OccupiedCore {
......@@ -373,12 +309,18 @@ mod tests {
loop {
futures::select! {
m = receiver.next() => match m.unwrap() {
RuntimeApi(RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx))) => {
FromJobCommand::SendMessage(
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx)),
),
) => {
assert_eq!(relay_parent, rp);
tx.send(Ok(vec![CoreState::Free, occupied_core(1), occupied_core(2)])).unwrap();
},
RuntimeApi(
RuntimeApiMessage::Request(rp, RuntimeApiRequest::CandidatePendingAvailability(para_id, tx))
FromJobCommand::SendMessage(
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(rp, RuntimeApiRequest::CandidatePendingAvailability(para_id, tx)),
),
) => {
assert_eq!(relay_parent, rp);
......@@ -388,7 +330,11 @@ mod tests {
tx.send(Ok(None)).unwrap();
}
},
AvailabilityStore(AvailabilityStoreMessage::QueryChunkAvailability(_, vidx, tx)) => {
FromJobCommand::SendMessage(
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunkAvailability(_, vidx, tx),
),
) => {
assert_eq!(validator_index, vidx);
tx.send(true).unwrap();
......
......@@ -30,86 +30,21 @@ use polkadot_node_subsystem::{
},
};
use polkadot_node_subsystem_util::{
self as util, delegated_subsystem, JobTrait, ToJobTrait, FromJobCommand,
metrics::{self, prometheus},
self as util, delegated_subsystem, JobTrait, FromJobCommand, metrics::{self, prometheus},
};
use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV};
use std::{convert::TryFrom, pin::Pin};
use std::pin::Pin;
use thiserror::Error;
const LOG_TARGET: &'static str = "candidate_selection";
struct CandidateSelectionJob {
sender: mpsc::Sender<FromJob>,
receiver: mpsc::Receiver<ToJob>,
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
metrics: Metrics,
seconded_candidate: Option<CollatorId>,
}
/// This enum defines the messages that the provisioner is prepared to receive.
#[derive(Debug)]
pub enum ToJob {
/// The provisioner message is the main input to the provisioner.
CandidateSelection(CandidateSelectionMessage),
/// This message indicates that the provisioner should shut itself down.
Stop,
}
impl ToJobTrait for ToJob {
const STOP: Self = Self::Stop;
fn relay_parent(&self) -> Option<Hash> {
match self {
Self::CandidateSelection(csm) => csm.relay_parent(),
Self::Stop => None,
}
}
}
impl TryFrom<AllMessages> for ToJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::CandidateSelection(csm) => Ok(Self::CandidateSelection(csm)),
_ => Err(()),
}
}
}
impl From<CandidateSelectionMessage> for ToJob {
fn from(csm: CandidateSelectionMessage) -> Self {
Self::CandidateSelection(csm)
}
}
#[derive(Debug)]
enum FromJob {
Backing(CandidateBackingMessage),
Collator(CollatorProtocolMessage),
}
impl From<FromJob> for FromJobCommand {
fn from(from_job: FromJob) -> FromJobCommand {
FromJobCommand::SendMessage(match from_job {
FromJob::Backing(msg) => AllMessages::CandidateBacking(msg),
FromJob::Collator(msg) => AllMessages::CollatorProtocol(msg),
})
}
}
impl TryFrom<AllMessages> for FromJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::CandidateBacking(msg) => Ok(FromJob::Backing(msg)),
AllMessages::CollatorProtocol(msg) => Ok(FromJob::Collator(msg)),
_ => Err(()),
}
}
}
#[derive(Debug, Error)]
enum Error {
#[error(transparent)]
......@@ -123,40 +58,32 @@ enum Error {
}
impl JobTrait for CandidateSelectionJob {
type ToJob = ToJob;
type FromJob = FromJob;
type ToJob = CandidateSelectionMessage;
type Error = Error;
type RunArgs = ();
type Metrics = Metrics;
const NAME: &'static str = "CandidateSelectionJob";
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
#[tracing::instrument(skip(_relay_parent, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
_relay_parent: Hash,
_run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<ToJob>,
sender: mpsc::Sender<FromJob>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
sender: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
Box::pin(async move {