Newer
Older
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Utility module for subsystems
//!
//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
//! or determining what their validator ID is. These common interests are factored into
//! this module.
Peter Goodspeed-Niklaus
committed
use polkadot_node_subsystem::{
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
Peter Goodspeed-Niklaus
committed
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use futures::{
channel::{mpsc, oneshot},
future::Either,
prelude::*,
select,
stream::Stream,
};
use futures_timer::Delay;
use keystore::KeyStorePtr;
use parity_scale_codec::Encode;
use pin_project::{pin_project, pinned_drop};
use polkadot_primitives::v1::{
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, GlobalValidationData,
GroupRotationInfo, Hash, Id as ParaId, LocalValidationData, OccupiedCoreAssumption,
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex,
ValidatorPair,
Peter Goodspeed-Niklaus
committed
use sp_core::{Pair, traits::SpawnNamed};
use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
marker::Unpin,
pin::Pin,
time::Duration,
};
use streamunordered::{StreamUnordered, StreamYield};
Peter Goodspeed-Niklaus
committed
/// These reexports are required so that external crates can use the `delegated_subsystem` macro properly.
pub mod reexports {
pub use sp_core::traits::SpawnNamed;
pub use polkadot_node_subsystem::{
SpawnedSubsystem,
Subsystem,
SubsystemContext,
};
}
/// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs
pub const JOB_CHANNEL_CAPACITY: usize = 64;
/// Utility errors
#[derive(Debug, derive_more::From)]
pub enum Error {
/// Attempted to send or receive on a oneshot channel which had been canceled
#[from]
Oneshot(oneshot::Canceled),
/// Attempted to send on a MPSC channel which has been canceled
#[from]
Mpsc(mpsc::SendError),
Peter Goodspeed-Niklaus
committed
/// A subsystem error
#[from]
Subsystem(SubsystemError),
/// An error in the Chain API.
#[from]
ChainApi(ChainApiError),
/// An error in the Runtime API.
asynchronous rob
committed
#[from]
RuntimeApi(RuntimeApiError),
Peter Goodspeed-Niklaus
committed
/// The type system wants this even though it doesn't make sense
#[from]
Infallible(std::convert::Infallible),
/// Attempted to convert from an AllMessages to a FromJob, and failed.
SenderConversion(String),
/// The local node is not a validator.
NotAValidator,
/// The desired job is not present in the jobs list.
JobNotFound(Hash),
Peter Goodspeed-Niklaus
committed
/// Already forwarding errors to another sender
AlreadyForwarding,
asynchronous rob
committed
/// A type alias for Runtime API receivers.
pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
/// Request some data from the `RuntimeApi`.
pub async fn request_from_runtime<RequestBuilder, Response, FromJob>(
parent: Hash,
sender: &mut mpsc::Sender<FromJob>,
request_builder: RequestBuilder,
asynchronous rob
committed
) -> Result<RuntimeApiReceiver<Response>, Error>
asynchronous rob
committed
RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
let (tx, rx) = oneshot::channel();
sender
.send(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
.try_into()
.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
)
.await?;
Ok(rx)
}
/// Construct specialized request functions for the runtime.
///
/// These would otherwise get pretty repetitive.
macro_rules! specialize_requests {
// expand return type name for documentation purposes
(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
specialize_requests!{
named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
}
};
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// create a single specialized request function
(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
#[doc = "Request `"]
#[doc = $doc_name]
#[doc = "` from the runtime"]
pub async fn $func_name<FromJob>(
parent: Hash,
$(
$param_name: $param_ty,
)*
sender: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<$return_ty>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
)).await
}
};
// recursive decompose
(
fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
$(
fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
)+
) => {
specialize_requests!{
fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
}
specialize_requests!{
$(
fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
)+
}
};
specialize_requests! {
fn request_validators() -> Vec<ValidatorId>; Validators;
fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
fn request_global_validation_data() -> GlobalValidationData; GlobalValidationData;
fn request_local_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<LocalValidationData>; LocalValidationData;
fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
/// Request some data from the `RuntimeApi` via a SubsystemContext.
async fn request_from_runtime_ctx<RequestBuilder, Context, Response>(
parent: Hash,
ctx: &mut Context,
request_builder: RequestBuilder,
) -> Result<RuntimeApiReceiver<Response>, Error>
Loading full blame...