// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! The provisioner is responsible for assembling a relay chain block //! from a set of available parachain candidates of its choice. use futures::{ channel::{mpsc, oneshot}, future::Either, prelude::*, select, stream::Stream, task, }; use polkadot_node_subsystem::{ messages::{AllMessages, ProvisionableData, ProvisionerInherentData, ProvisionerMessage}, util::{JobTrait, ToJobTrait}, }; use polkadot_primitives::v1::{ EncodeAs, Hash, HeadData, Id as ParaId, Signed, SigningContext, ValidatorId, ValidatorIndex, ValidatorPair, }; use std::{ convert::TryFrom, pin::Pin, }; pub struct ProvisioningJob { receiver: mpsc::Receiver, provisionable_data_channels: Vec>, } pub enum ToJob { Provisioner(ProvisionerMessage), Stop, } impl ToJobTrait for ToJob { const STOP: Self = Self::Stop; fn relay_parent(&self) -> Option { match self { Self::Provisioner(pm) => pm.relay_parent(), Self::Stop => None, } } } impl TryFrom for ToJob { type Error = (); fn try_from(msg: AllMessages) -> Result { match msg { AllMessages::Provisioner(pm) => Ok(Self::Provisioner(pm)), _ => Err(()), } } } impl From for ToJob { fn from(pm: ProvisionerMessage) -> Self { Self::Provisioner(pm) } } // not currently instantiable pub enum FromJob {} impl From for AllMessages { fn from(from_job: FromJob) -> AllMessages { unreachable!("uninstantiable; qed") } } #[derive(Debug, derive_more::From)] pub enum Error { #[from] Sending(mpsc::SendError), } impl JobTrait for ProvisioningJob { type ToJob = ToJob; type FromJob = FromJob; type Error = Error; type RunArgs = (); const NAME: &'static str = "ProvisioningJob"; /// Run a job for the parent block indicated // // this function is in charge of creating and executing the job's main loop fn run( _parent: Hash, _run_args: Self::RunArgs, receiver: mpsc::Receiver, _sender: mpsc::Sender, ) -> Pin> + Send>> { async move { let job = ProvisioningJob::new(receiver); // it isn't necessary to break run_loop into its own function, // but it's convenient to separate the concerns in this way job.run_loop().await } .boxed() } } impl ProvisioningJob { pub fn new(receiver: mpsc::Receiver) -> Self { Self { receiver, provisionable_data_channels: Vec::new(), } } async fn run_loop(mut self) -> Result<(), Error> { while let Some(msg) = self.receiver.next().await { use ProvisionerMessage::{RequestBlockAuthorshipData, RequestInherentData, ProvisionableData}; match msg { ToJob::Provisioner(RequestInherentData(_, sender)) => unimplemented!(), ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => self.provisionable_data_channels.push(sender), ToJob::Provisioner(ProvisionableData(data)) => { for channel in self.provisionable_data_channels.iter_mut() { // REVIEW: the try operator here breaks the run loop if any receiver ever unexpectedly // closes their channel. Is that desired? channel.send(data.clone()).await?; } } ToJob::Stop => break, } } Ok(()) } }