Commit e7f5276e authored by Peter Goodspeed-Niklaus's avatar Peter Goodspeed-Niklaus
Browse files

Revert "reenvision the subsystem requests as an extension trait"

This reverts commit a5639e36.

The fact is, the new API is more complicated to no real benefit.
parent a5639e36
Pipeline #100314 passed with stages
in 13 minutes and 6 seconds
......@@ -43,12 +43,13 @@ use polkadot_subsystem::{
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage, ValidationFailed,
ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
},
util::{
self,
JobTrait,
JobTraitExt,
request_signing_context,
request_validator_groups,
request_validators,
Validator,
},
};
......@@ -197,6 +198,23 @@ impl From<FromJob> for AllMessages {
}
}
impl TryFrom<AllMessages> for FromJob {
type Error = &'static str;
fn try_from(f: AllMessages) -> Result<Self, Self::Error> {
match f {
AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)),
AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)),
AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)),
AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)),
AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)),
AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)),
AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)),
_ => Err("can't convert this AllMessages variant to FromJob"),
}
}
}
// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
......@@ -529,6 +547,38 @@ impl CandidateBackingJob {
Ok(())
}
async fn request_pov_from_distribution(
&mut self,
descriptor: CandidateDescriptor,
) -> Result<Arc<PoV>, Error> {
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::PoVDistribution(
PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
)).await?;
Ok(rx.await?)
}
async fn request_candidate_validation(
&mut self,
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::CandidateValidation(
CandidateValidationMessage::ValidateFromChainState(
candidate,
pov,
tx,
)
)
).await?;
Ok(rx.await??)
}
async fn store_chunk(
&mut self,
id: ValidatorIndex,
......@@ -606,7 +656,7 @@ impl CandidateBackingJob {
}
}
impl JobTrait for CandidateBackingJob {
impl util::JobTrait for CandidateBackingJob {
type ToJob = ToJob;
type FromJob = FromJob;
type Error = Error;
......@@ -622,9 +672,9 @@ impl JobTrait for CandidateBackingJob {
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let (validators, roster, signing_context) = futures::try_join!(
self.request_validators().await?,
self.request_validator_groups().await?,
self.request_signing_context().await?,
request_validators(parent, &mut tx_from).await?,
request_validator_groups(parent, &mut tx_from).await?,
request_signing_context(parent, &mut tx_from).await?,
)?;
let validator = Validator::construct(&validators, signing_context, keystore.clone())?;
......@@ -674,16 +724,6 @@ impl JobTrait for CandidateBackingJob {
}
}
impl JobTraitExt for CandidateBackingJob {
fn sender(&self) -> mpsc::Sender<Self::FromJob> {
self.tx_from.clone()
}
fn relay_parent(&self) -> Hash {
self.parent
}
}
/// An implementation of the Candidate Backing subsystem.
pub type CandidateBackingSubsystem<Spawner, Context> =
util::JobManager<Spawner, Context, CandidateBackingJob>;
......
......@@ -21,7 +21,7 @@
//! this module.
use crate::{
messages::{self, AllMessages, SchedulerRoster},
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, SchedulerRoster},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
};
use futures::{
......@@ -36,10 +36,9 @@ use futures_timer::Delay;
use keystore::KeyStorePtr;
use parity_scale_codec::Encode;
use pin_project::{pin_project, pinned_drop};
use polkadot_node_primitives::ValidationResult;
use polkadot_primitives::v1::{
CandidateDescriptor, EncodeAs, Hash, PoV, Signed, SigningContext, ValidatorId, ValidatorIndex,
ValidatorPair,
EncodeAs, Hash, HeadData, Id as ParaId, Signed, SigningContext,
ValidatorId, ValidatorIndex, ValidatorPair,
};
use sp_core::Pair;
use std::{
......@@ -47,7 +46,6 @@ use std::{
convert::{TryFrom, TryInto},
marker::Unpin,
pin::Pin,
sync::Arc,
time::Duration,
};
use streamunordered::{StreamUnordered, StreamYield};
......@@ -69,12 +67,6 @@ pub enum Error {
/// Attempted to spawn a new task, and failed
#[from]
Spawn(SpawnError),
/// Failed to determine whether a candidate was valid or invalid
#[from]
ValidationFailed(messages::ValidationFailed),
/// Failed to determine whether a candidate was valid or invalid, in a different way
#[from]
ValidationResult(ValidationResult),
/// Attempted to convert from an AllMessages to a FromJob, and failed.
SenderConversion(String),
/// The local node is not a validator.
......@@ -83,6 +75,79 @@ pub enum Error {
JobNotFound(Hash),
}
/// Request some data from the `RuntimeApi`.
pub async fn request_from_runtime<RequestBuilder, Response, FromJob>(
parent: Hash,
sender: &mut mpsc::Sender<FromJob>,
request_builder: RequestBuilder,
) -> Result<oneshot::Receiver<Response>, Error>
where
RequestBuilder: FnOnce(oneshot::Sender<Response>) -> RuntimeApiRequest,
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
let (tx, rx) = oneshot::channel();
sender
.send(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
.try_into()
.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
)
.await?;
Ok(rx)
}
/// Request a validator set from the `RuntimeApi`.
pub async fn request_validators<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<Vec<ValidatorId>>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| RuntimeApiRequest::Validators(tx)).await
}
/// Request the scheduler roster from `RuntimeApi`.
pub async fn request_validator_groups<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<SchedulerRoster>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| RuntimeApiRequest::ValidatorGroups(tx)).await
}
/// Request a `SigningContext` from the `RuntimeApi`.
pub async fn request_signing_context<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<SigningContext>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| RuntimeApiRequest::SigningContext(tx)).await
}
/// Request `HeadData` for some `ParaId` from `RuntimeApi`.
pub async fn request_head_data<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
id: ParaId,
) -> Result<oneshot::Receiver<HeadData>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| RuntimeApiRequest::HeadData(id, tx)).await
}
/// From the given set of validators, find the first key we can sign with, if any.
pub fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<ValidatorPair> {
let keystore = keystore.read();
......@@ -103,19 +168,29 @@ pub struct Validator {
impl Validator {
/// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block.
pub async fn new<Job: SubsystemRequests>(
job: Job,
pub async fn new<FromJob>(
parent: Hash,
keystore: KeyStorePtr,
) -> Result<Self, Error> {
mut sender: mpsc::Sender<FromJob>,
) -> Result<Self, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
// Note: request_validators and request_signing_context do not and cannot run concurrently: they both
// have a mutable handle to the same sender.
// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
let (validators, signing_context) = futures::try_join!(
job.request_validators().await?,
job.request_signing_context().await?,
request_validators(parent, &mut sender).await?,
request_signing_context(parent, &mut sender).await?,
)?;
Self::construct(&validators, signing_context, keystore)
}
/// Construct a validator instance
/// Construct a validator instance without performing runtime fetches.
///
/// This can be useful if external code also needs the same data.
pub fn construct(
validators: &[ValidatorId],
signing_context: SigningContext,
......@@ -267,191 +342,6 @@ pub trait JobTrait: Unpin {
}
}
/// This trait provides a helper abstraction for sending a message to another subsystem
/// and collecting their response.
pub trait JobTraitExt: JobTrait {
/// Get a clone of the sender to the overseer.
fn sender(&self) -> mpsc::Sender<Self::FromJob>;
/// Get the relay parent for this job.
fn relay_parent(&self) -> Hash;
/// Request some data from another subsystem via the Overseer.
///
/// The arguments to `request_builder` are `parent_hash, response_sender`.
fn request<RequestBuilder, Response>(
&self,
request_builder: RequestBuilder,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Response>, Error>> + Send>>
where
RequestBuilder: 'static + Send + FnOnce(Hash, oneshot::Sender<Response>) -> Self::FromJob,
Response: Send,
{
let mut sender = self.sender();
let parent = self.relay_parent();
async move {
let (tx, rx) = oneshot::channel();
sender.send(request_builder(parent, tx)).await?;
Ok(rx)
}
.boxed()
}
}
/// This trait enables a blanket impl of several useful getters.
///
/// The blanket impl takes effect for any job for which `Job: JobTraitExt` and
/// `Job::FromJob: TryFrom<AllMessages>`.
///
/// It's distinct from JobTraitExt because it may be useful to `impl JobTraitExt`
/// for some `Job` for which `Job::FromJob: !TryFrom<AllMessages>`, so we don't want
/// to tie this too tightly to that trait.
pub trait SubsystemRequests: JobTraitExt {
/// Request some data from another subsystem via the Overseer.
///
/// The arguments to `request_builder` are `parent_hash, response_sender`.
///
/// The difference between this methods and `self.request` is the return type of
/// `request_builder`: it returns an `AllMessages` instance instead of `Self::FromJob`.
fn request_allmessages<RequestBuilder, Response>(
&self,
request_builder: RequestBuilder,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Response>, Error>> + Send>>
where
RequestBuilder: 'static + Send + FnOnce(Hash, oneshot::Sender<Response>) -> AllMessages,
Response: Send;
/// Request that a particular candidate is validated by `CandidateValidation`.
fn request_candidate_validation(
&self,
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<ValidationResult>, Error>>>>;
/// Request a PoV from `PoVDistribution`
fn request_pov(
&self,
candidate: CandidateDescriptor,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Arc<PoV>>, Error>>>>;
/// Request the current scheduler roster from the `RuntimeApi`.
fn request_scheduler_roster(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<SchedulerRoster>, Error>>>>;
/// Request the current signing context from the `RuntimeApi`.
fn request_signing_context(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<SigningContext>, Error>>>>;
/// Request the current validator set from the `RuntimeApi`.
fn request_validators(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Vec<ValidatorId>>, Error>>>>;
}
// We do not and cannot know the exact type of Job::FromJob, and it would be tedious for extension in the future
// to write a `FromJobTrait` which produced all necessary variants. Instead, we bound `Job::FromJob: TryFrom<AllMessages>`
// so that we have a path to send the appropriate (limited) message through the job's outbound channel.
//
// This isn't an ideal API, as invalid requests become runtime errors instead of compile-time errors,
// but there isn't a better API apparent as of right now.
impl<Job> SubsystemRequests for Job
where
Job: JobTraitExt,
Job::FromJob: TryFrom<AllMessages>,
<Job::FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
fn request_allmessages<RequestBuilder, Response>(
&self,
request_builder: RequestBuilder,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Response>, Error>> + Send>>
where
RequestBuilder: 'static + Send + FnOnce(Hash, oneshot::Sender<Response>) -> AllMessages,
Response: Send,
{
// although it would be cleaner to delegate to self.request(), doing so doesn't give us any good way
// to return an error in the event of conversion failure, so we just have to copy the implementation
// instead.
let mut sender = self.sender();
let parent = self.relay_parent();
async move {
let (tx, rx) = oneshot::channel();
let msg = request_builder(parent, tx).try_into().map_err(|err| {
Error::SenderConversion(format!("could not construct AllMessages: {:?}", err))
})?;
sender.send(msg).await?;
Ok(rx)
}
.boxed()
}
fn request_candidate_validation(
&self,
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<ValidationResult>, Error>>>> {
use messages::CandidateValidationMessage::ValidateFromChainState;
use AllMessages::CandidateValidation;
let (tx, rx) = oneshot::channel();
let ram = self.request_allmessages(move |_parent, tx| {
CandidateValidation(ValidateFromChainState(candidate, pov, tx))
});
async move {
let result = ram.await?.await??;
tx.send(result)?;
Ok(rx)
}.boxed()
}
fn request_pov(
&self,
descriptor: CandidateDescriptor,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Arc<PoV>>, Error>>>> {
use messages::PoVDistributionMessage::FetchPoV;
use AllMessages::PoVDistribution;
self.request_allmessages(move |parent, tx| {
PoVDistribution(FetchPoV(parent, descriptor, tx))
})
}
fn request_scheduler_roster(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<SchedulerRoster>, Error>>>> {
unimplemented!()
}
fn request_signing_context(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<SigningContext>, Error>>>> {
use messages::RuntimeApiMessage::Request;
use messages::RuntimeApiRequest::SigningContext;
use AllMessages::RuntimeApi;
self
.request_allmessages(move |parent, tx| RuntimeApi(Request(parent, SigningContext(tx))))
}
fn request_validators(
&self,
) -> Pin<Box<dyn Future<Output = Result<oneshot::Receiver<Vec<ValidatorId>>, Error>>>> {
unimplemented!()
}
}
/// Jobs manager for a subsystem
///
/// - Spawns new jobs for a given relay-parent on demand.
......@@ -561,12 +451,13 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
// pin-project the outgoing messages
self.project().outgoing_msgs.poll_next(cx).map(|opt| {
opt.and_then(|(stream_yield, _)| match stream_yield {
self.project()
.outgoing_msgs
.poll_next(cx)
.map(|opt| opt.and_then(|(stream_yield, _)| match stream_yield {
StreamYield::Item(msg) => Some(msg),
StreamYield::Finished(_) => None,
})
})
}))
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment