Commit a5639e36 authored by Peter Goodspeed-Niklaus's avatar Peter Goodspeed-Niklaus
Browse files

reenvision the subsystem requests as an extension trait

This works within `util.rs`, but fails in `core/backing/src/lib.rs`,
because we don't actually create the struct soon enough. Continuing
down this path would imply substantial rewriting.
parent 802981a9
...@@ -43,13 +43,12 @@ use polkadot_subsystem::{ ...@@ -43,13 +43,12 @@ use polkadot_subsystem::{
messages::{ messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData, CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed, ProvisionerMessage, RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage, ValidationFailed,
}, },
util::{ util::{
self, self,
request_signing_context, JobTrait,
request_validator_groups, JobTraitExt,
request_validators,
Validator, Validator,
}, },
}; };
...@@ -198,23 +197,6 @@ impl From<FromJob> for AllMessages { ...@@ -198,23 +197,6 @@ impl From<FromJob> for AllMessages {
} }
} }
impl TryFrom<AllMessages> for FromJob {
type Error = &'static str;
fn try_from(f: AllMessages) -> Result<Self, Self::Error> {
match f {
AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)),
AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)),
AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)),
AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)),
AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)),
AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)),
AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)),
_ => Err("can't convert this AllMessages variant to FromJob"),
}
}
}
// It looks like it's not possible to do an `impl From` given the current state of // It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion. // the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement { fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
...@@ -547,38 +529,6 @@ impl CandidateBackingJob { ...@@ -547,38 +529,6 @@ impl CandidateBackingJob {
Ok(()) Ok(())
} }
async fn request_pov_from_distribution(
&mut self,
descriptor: CandidateDescriptor,
) -> Result<Arc<PoV>, Error> {
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::PoVDistribution(
PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
)).await?;
Ok(rx.await?)
}
async fn request_candidate_validation(
&mut self,
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::CandidateValidation(
CandidateValidationMessage::ValidateFromChainState(
candidate,
pov,
tx,
)
)
).await?;
Ok(rx.await??)
}
async fn store_chunk( async fn store_chunk(
&mut self, &mut self,
id: ValidatorIndex, id: ValidatorIndex,
...@@ -656,7 +606,7 @@ impl CandidateBackingJob { ...@@ -656,7 +606,7 @@ impl CandidateBackingJob {
} }
} }
impl util::JobTrait for CandidateBackingJob { impl JobTrait for CandidateBackingJob {
type ToJob = ToJob; type ToJob = ToJob;
type FromJob = FromJob; type FromJob = FromJob;
type Error = Error; type Error = Error;
...@@ -672,9 +622,9 @@ impl util::JobTrait for CandidateBackingJob { ...@@ -672,9 +622,9 @@ impl util::JobTrait for CandidateBackingJob {
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> { ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move { async move {
let (validators, roster, signing_context) = futures::try_join!( let (validators, roster, signing_context) = futures::try_join!(
request_validators(parent, &mut tx_from).await?, self.request_validators().await?,
request_validator_groups(parent, &mut tx_from).await?, self.request_validator_groups().await?,
request_signing_context(parent, &mut tx_from).await?, self.request_signing_context().await?,
)?; )?;
let validator = Validator::construct(&validators, signing_context, keystore.clone())?; let validator = Validator::construct(&validators, signing_context, keystore.clone())?;
...@@ -724,6 +674,16 @@ impl util::JobTrait for CandidateBackingJob { ...@@ -724,6 +674,16 @@ impl util::JobTrait for CandidateBackingJob {
} }
} }
impl JobTraitExt for CandidateBackingJob {
fn sender(&self) -> mpsc::Sender<Self::FromJob> {
self.tx_from.clone()
}
fn relay_parent(&self) -> Hash {
self.parent
}
}
/// An implementation of the Candidate Backing subsystem. /// An implementation of the Candidate Backing subsystem.
pub type CandidateBackingSubsystem<Spawner, Context> = pub type CandidateBackingSubsystem<Spawner, Context> =
util::JobManager<Spawner, Context, CandidateBackingJob>; util::JobManager<Spawner, Context, CandidateBackingJob>;
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
//! this module. //! this module.
use crate::{ use crate::{
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, SchedulerRoster}, messages::{self, AllMessages, SchedulerRoster},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
}; };
use futures::{ use futures::{
...@@ -36,9 +36,10 @@ use futures_timer::Delay; ...@@ -36,9 +36,10 @@ use futures_timer::Delay;
use keystore::KeyStorePtr; use keystore::KeyStorePtr;
use parity_scale_codec::Encode; use parity_scale_codec::Encode;
use pin_project::{pin_project, pinned_drop}; use pin_project::{pin_project, pinned_drop};
use polkadot_node_primitives::ValidationResult;
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
EncodeAs, Hash, HeadData, Id as ParaId, Signed, SigningContext, CandidateDescriptor, EncodeAs, Hash, PoV, Signed, SigningContext, ValidatorId, ValidatorIndex,
ValidatorId, ValidatorIndex, ValidatorPair, ValidatorPair,
}; };
use sp_core::Pair; use sp_core::Pair;
use std::{ use std::{
...@@ -46,6 +47,7 @@ use std::{ ...@@ -46,6 +47,7 @@ use std::{
convert::{TryFrom, TryInto}, convert::{TryFrom, TryInto},
marker::Unpin, marker::Unpin,
pin::Pin, pin::Pin,
sync::Arc,
time::Duration, time::Duration,
}; };
use streamunordered::{StreamUnordered, StreamYield}; use streamunordered::{StreamUnordered, StreamYield};
...@@ -67,6 +69,12 @@ pub enum Error { ...@@ -67,6 +69,12 @@ pub enum Error {
/// Attempted to spawn a new task, and failed /// Attempted to spawn a new task, and failed
#[from] #[from]
Spawn(SpawnError), Spawn(SpawnError),
/// Failed to determine whether a candidate was valid or invalid
#[from]
ValidationFailed(messages::ValidationFailed),
/// Failed to determine whether a candidate was valid or invalid, in a different way
#[from]
ValidationResult(ValidationResult),
/// Attempted to convert from an AllMessages to a FromJob, and failed. /// Attempted to convert from an AllMessages to a FromJob, and failed.
SenderConversion(String), SenderConversion(String),
/// The local node is not a validator. /// The local node is not a validator.
...@@ -75,79 +83,6 @@ pub enum Error { ...@@ -75,79 +83,6 @@ pub enum Error {
JobNotFound(Hash), JobNotFound(Hash),
} }
/// Request some data from the `RuntimeApi`.
pub async fn request_from_runtime<RequestBuilder, Response, FromJob>(
parent: Hash,
sender: &mut mpsc::Sender<FromJob>,
request_builder: RequestBuilder,
) -> Result<oneshot::Receiver<Response>, Error>
where
RequestBuilder: FnOnce(oneshot::Sender<Response>) -> RuntimeApiRequest,
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
let (tx, rx) = oneshot::channel();
sender
.send(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
.try_into()
.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
)
.await?;
Ok(rx)
}
/// Request a validator set from the `RuntimeApi`.
pub async fn request_validators<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<Vec<ValidatorId>>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| RuntimeApiRequest::Validators(tx)).await
}
/// Request the scheduler roster from `RuntimeApi`.
pub async fn request_validator_groups<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<SchedulerRoster>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| RuntimeApiRequest::ValidatorGroups(tx)).await
}
/// Request a `SigningContext` from the `RuntimeApi`.
pub async fn request_signing_context<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<SigningContext>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| RuntimeApiRequest::SigningContext(tx)).await
}
/// Request `HeadData` for some `ParaId` from `RuntimeApi`.
pub async fn request_head_data<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
id: ParaId,
) -> Result<oneshot::Receiver<HeadData>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| RuntimeApiRequest::HeadData(id, tx)).await
}
/// From the given set of validators, find the first key we can sign with, if any. /// From the given set of validators, find the first key we can sign with, if any.
pub fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<ValidatorPair> { pub fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<ValidatorPair> {
let keystore = keystore.read(); let keystore = keystore.read();
...@@ -168,29 +103,19 @@ pub struct Validator { ...@@ -168,29 +103,19 @@ pub struct Validator {
impl Validator { impl Validator {
/// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block. /// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block.
pub async fn new<FromJob>( pub async fn new<Job: SubsystemRequests>(
parent: Hash, job: Job,
keystore: KeyStorePtr, keystore: KeyStorePtr,
mut sender: mpsc::Sender<FromJob>, ) -> Result<Self, Error> {
) -> Result<Self, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
// Note: request_validators and request_signing_context do not and cannot run concurrently: they both
// have a mutable handle to the same sender.
// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
let (validators, signing_context) = futures::try_join!( let (validators, signing_context) = futures::try_join!(
request_validators(parent, &mut sender).await?, job.request_validators().await?,
request_signing_context(parent, &mut sender).await?, job.request_signing_context().await?,
)?; )?;
Self::construct(&validators, signing_context, keystore) Self::construct(&validators, signing_context, keystore)
} }
/// Construct a validator instance without performing runtime fetches. /// Construct a validator instance
///
/// This can be useful if external code also needs the same data.
pub fn construct( pub fn construct(
validators: &[ValidatorId], validators: &[ValidatorId],
signing_context: SigningContext, signing_context: SigningContext,
...@@ -342,6 +267,191 @@ pub trait JobTrait: Unpin { ...@@ -342,6 +267,191 @@ pub trait JobTrait: Unpin {
} }
} }
/// This trait provides a helper abstraction for sending a message to another subsystem
/// and collecting their response.
pub trait JobTraitExt: JobTrait {
/// Get a clone of the sender to the overseer.
fn sender(&self) -> mpsc::Sender<Self::FromJob>;
/// Get the relay parent for this job.
fn relay_parent(&self) -> Hash;
/// Request some data from another subsystem via the Overseer.
///
/// The arguments to `request_builder` are `parent_hash, response_sender`.
fn request<RequestBuilder, Response>(
&self,
request_builder: RequestBuilder,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Response>, Error>> + Send>>
where
RequestBuilder: 'static + Send + FnOnce(Hash, oneshot::Sender<Response>) -> Self::FromJob,
Response: Send,
{
let mut sender = self.sender();
let parent = self.relay_parent();
async move {
let (tx, rx) = oneshot::channel();
sender.send(request_builder(parent, tx)).await?;
Ok(rx)
}
.boxed()
}
}
/// This trait enables a blanket impl of several useful getters.
///
/// The blanket impl takes effect for any job for which `Job: JobTraitExt` and
/// `Job::FromJob: TryFrom<AllMessages>`.
///
/// It's distinct from JobTraitExt because it may be useful to `impl JobTraitExt`
/// for some `Job` for which `Job::FromJob: !TryFrom<AllMessages>`, so we don't want
/// to tie this too tightly to that trait.
pub trait SubsystemRequests: JobTraitExt {
/// Request some data from another subsystem via the Overseer.
///
/// The arguments to `request_builder` are `parent_hash, response_sender`.
///
/// The difference between this methods and `self.request` is the return type of
/// `request_builder`: it returns an `AllMessages` instance instead of `Self::FromJob`.
fn request_allmessages<RequestBuilder, Response>(
&self,
request_builder: RequestBuilder,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Response>, Error>> + Send>>
where
RequestBuilder: 'static + Send + FnOnce(Hash, oneshot::Sender<Response>) -> AllMessages,
Response: Send;
/// Request that a particular candidate is validated by `CandidateValidation`.
fn request_candidate_validation(
&self,
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<ValidationResult>, Error>>>>;
/// Request a PoV from `PoVDistribution`
fn request_pov(
&self,
candidate: CandidateDescriptor,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Arc<PoV>>, Error>>>>;
/// Request the current scheduler roster from the `RuntimeApi`.
fn request_scheduler_roster(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<SchedulerRoster>, Error>>>>;
/// Request the current signing context from the `RuntimeApi`.
fn request_signing_context(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<SigningContext>, Error>>>>;
/// Request the current validator set from the `RuntimeApi`.
fn request_validators(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Vec<ValidatorId>>, Error>>>>;
}
// We do not and cannot know the exact type of Job::FromJob, and it would be tedious for extension in the future
// to write a `FromJobTrait` which produced all necessary variants. Instead, we bound `Job::FromJob: TryFrom<AllMessages>`
// so that we have a path to send the appropriate (limited) message through the job's outbound channel.
//
// This isn't an ideal API, as invalid requests become runtime errors instead of compile-time errors,
// but there isn't a better API apparent as of right now.
impl<Job> SubsystemRequests for Job
where
Job: JobTraitExt,
Job::FromJob: TryFrom<AllMessages>,
<Job::FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
fn request_allmessages<RequestBuilder, Response>(
&self,
request_builder: RequestBuilder,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Response>, Error>> + Send>>
where
RequestBuilder: 'static + Send + FnOnce(Hash, oneshot::Sender<Response>) -> AllMessages,
Response: Send,
{
// although it would be cleaner to delegate to self.request(), doing so doesn't give us any good way
// to return an error in the event of conversion failure, so we just have to copy the implementation
// instead.
let mut sender = self.sender();
let parent = self.relay_parent();
async move {
let (tx, rx) = oneshot::channel();
let msg = request_builder(parent, tx).try_into().map_err(|err| {
Error::SenderConversion(format!("could not construct AllMessages: {:?}", err))
})?;
sender.send(msg).await?;
Ok(rx)
}
.boxed()
}
fn request_candidate_validation(
&self,
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<ValidationResult>, Error>>>> {
use messages::CandidateValidationMessage::ValidateFromChainState;
use AllMessages::CandidateValidation;
let (tx, rx) = oneshot::channel();
let ram = self.request_allmessages(move |_parent, tx| {
CandidateValidation(ValidateFromChainState(candidate, pov, tx))
});
async move {
let result = ram.await?.await??;
tx.send(result)?;
Ok(rx)
}.boxed()
}
fn request_pov(
&self,
descriptor: CandidateDescriptor,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Arc<PoV>>, Error>>>> {
use messages::PoVDistributionMessage::FetchPoV;
use AllMessages::PoVDistribution;
self.request_allmessages(move |parent, tx| {
PoVDistribution(FetchPoV(parent, descriptor, tx))
})
}
fn request_scheduler_roster(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<SchedulerRoster>, Error>>>> {
unimplemented!()
}
fn request_signing_context(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<SigningContext>, Error>>>> {
use messages::RuntimeApiMessage::Request;
use messages::RuntimeApiRequest::SigningContext;
use AllMessages::RuntimeApi;
self
.request_allmessages(move |parent, tx| RuntimeApi(Request(parent, SigningContext(tx))))
}
fn request_validators(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Vec<ValidatorId>>, Error>>>> {
unimplemented!()
}
}
/// Jobs manager for a subsystem /// Jobs manager for a subsystem
/// ///
/// - Spawns new jobs for a given relay-parent on demand. /// - Spawns new jobs for a given relay-parent on demand.
...@@ -451,13 +561,12 @@ where ...@@ -451,13 +561,12 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
// pin-project the outgoing messages // pin-project the outgoing messages
self.project() self.project().outgoing_msgs.poll_next(cx).map(|opt| {
.outgoing_msgs opt.and_then(|(stream_yield, _)| match stream_yield {
.poll_next(cx)
.map(|opt| opt.and_then(|(stream_yield, _)| match stream_yield {
StreamYield::Item(msg) => Some(msg), StreamYield::Item(msg) => Some(msg),
StreamYield::Finished(_) => None, StreamYield::Finished(_) => None,
})) })
})
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment