Newer
Older
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Utility module for subsystems
//!
//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
//! or determining what their validator ID is. These common interests are factored into
//! this module.
use crate::{
asynchronous rob
committed
messages::{
AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender,
asynchronous rob
committed
},
Peter Goodspeed-Niklaus
committed
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use futures::{
channel::{mpsc, oneshot},
future::Either,
prelude::*,
select,
stream::Stream,
};
use futures_timer::Delay;
use keystore::KeyStorePtr;
use parity_scale_codec::Encode;
use pin_project::{pin_project, pinned_drop};
use polkadot_primitives::v1::{
CoreState, EncodeAs, Hash, Signed, SigningContext, SessionIndex,
asynchronous rob
committed
ValidatorId, ValidatorIndex, ValidatorPair, GroupRotationInfo,
use sp_core::Pair;
use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
marker::Unpin,
pin::Pin,
time::Duration,
};
use streamunordered::{StreamUnordered, StreamYield};
/// This reexport is required so that external crates can use the `delegated_subsystem` macro properly.
///
/// Otherwise, downstream crates might have to modify their `Cargo.toml` to ensure `sp-core` appeared there.
pub use sp_core::traits::SpawnNamed;
/// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs
pub const JOB_CHANNEL_CAPACITY: usize = 64;
/// Utility errors
#[derive(Debug, derive_more::From)]
pub enum Error {
/// Attempted to send or receive on a oneshot channel which had been canceled
#[from]
Oneshot(oneshot::Canceled),
/// Attempted to send on a MPSC channel which has been canceled
#[from]
Mpsc(mpsc::SendError),
Peter Goodspeed-Niklaus
committed
/// A subsystem error
#[from]
Subsystem(SubsystemError),
/// An error in the Chain API.
#[from]
ChainApi(ChainApiError),
/// An error in the Runtime API.
asynchronous rob
committed
#[from]
RuntimeApi(RuntimeApiError),
Peter Goodspeed-Niklaus
committed
/// The type system wants this even though it doesn't make sense
#[from]
Infallible(std::convert::Infallible),
/// Attempted to convert from an AllMessages to a FromJob, and failed.
SenderConversion(String),
/// The local node is not a validator.
NotAValidator,
/// The desired job is not present in the jobs list.
JobNotFound(Hash),
Peter Goodspeed-Niklaus
committed
/// Already forwarding errors to another sender
AlreadyForwarding,
asynchronous rob
committed
/// A type alias for Runtime API receivers.
pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
/// Request some data from the `RuntimeApi`.
pub async fn request_from_runtime<RequestBuilder, Response, FromJob>(
parent: Hash,
sender: &mut mpsc::Sender<FromJob>,
request_builder: RequestBuilder,
asynchronous rob
committed
) -> Result<RuntimeApiReceiver<Response>, Error>
asynchronous rob
committed
RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
let (tx, rx) = oneshot::channel();
sender
.send(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
.try_into()
.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
)
.await?;
Ok(rx)
}
/// Request a validator set from the `RuntimeApi`.
pub async fn request_validators<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
asynchronous rob
committed
) -> Result<RuntimeApiReceiver<Vec<ValidatorId>>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| RuntimeApiRequest::Validators(tx)).await
}
asynchronous rob
committed
/// Request the validator groups.
pub async fn request_validator_groups<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
asynchronous rob
committed
) -> Result<RuntimeApiReceiver<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| RuntimeApiRequest::ValidatorGroups(tx)).await
}
asynchronous rob
committed
/// Request the session index of the child block.
pub async fn request_session_index_for_child<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
asynchronous rob
committed
) -> Result<RuntimeApiReceiver<SessionIndex>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
asynchronous rob
committed
request_from_runtime(parent, s, |tx| {
RuntimeApiRequest::SessionIndexForChild(tx)
}).await
/// Request all availability cores
pub async fn request_availability_cores<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<Vec<CoreState>>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| {
RuntimeApiRequest::AvailabilityCores(tx)
}).await
}
/// Request global validation data.
pub async fn request_global_validation_data<FromJob>(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<GlobalValidationData>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, s, |tx| {
RuntimeApiRequest::GlobalValidationData(tx)
}).await
}
/// Request local validation data.
pub async fn request_local_validation_Data<FromJob>(
parent: Hash,
para_id: ParaId,
assumption: OccupiedCoreAssumption,
s: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<LocalValidationData>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
Loading full blame...