util.rs 35.7 KiB
Newer Older
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Utility module for subsystems
//!
//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
//! or determining what their validator ID is. These common interests are factored into
//! this module.

use crate::{
ordian's avatar
ordian committed
		AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender,
ordian's avatar
ordian committed
	errors::{ChainApiError, RuntimeApiError},
	FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use futures::{
	channel::{mpsc, oneshot},
	future::Either,
	prelude::*,
	select,
	stream::Stream,
};
use futures_timer::Delay;
use keystore::KeyStorePtr;
use parity_scale_codec::Encode;
use pin_project::{pin_project, pinned_drop};
use polkadot_primitives::v1::{
	CoreState, EncodeAs, Hash, Signed, SigningContext, SessionIndex,
	ValidatorId, ValidatorIndex, ValidatorPair, GroupRotationInfo,
use std::{
	collections::HashMap,
	convert::{TryFrom, TryInto},
	marker::Unpin,
	pin::Pin,
	time::Duration,
};
use streamunordered::{StreamUnordered, StreamYield};

/// This reexport is required so that external crates can use the `delegated_subsystem` macro properly.
///
/// Otherwise, downstream crates might have to modify their `Cargo.toml` to ensure `sp-core` appeared there.
pub use sp_core::traits::SpawnNamed;

/// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs
pub const JOB_CHANNEL_CAPACITY: usize = 64;

/// Utility errors
#[derive(Debug, derive_more::From)]
pub enum Error {
	/// Attempted to send or receive on a oneshot channel which had been canceled
	#[from]
	Oneshot(oneshot::Canceled),
	/// Attempted to send on a MPSC channel which has been canceled
	#[from]
	Mpsc(mpsc::SendError),
	/// A subsystem error
	#[from]
	Subsystem(SubsystemError),
ordian's avatar
ordian committed
	/// An error in the Chain API.
	#[from]
	ChainApi(ChainApiError),
	/// An error in the Runtime API.
	/// The type system wants this even though it doesn't make sense
	#[from]
	Infallible(std::convert::Infallible),
	/// Attempted to convert from an AllMessages to a FromJob, and failed.
	SenderConversion(String),
	/// The local node is not a validator.
	NotAValidator,
	/// The desired job is not present in the jobs list.
	JobNotFound(Hash),
	/// Already forwarding errors to another sender
	AlreadyForwarding,
/// A type alias for Runtime API receivers.
pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;

/// Request some data from the `RuntimeApi`.
pub async fn request_from_runtime<RequestBuilder, Response, FromJob>(
	parent: Hash,
	sender: &mut mpsc::Sender<FromJob>,
	request_builder: RequestBuilder,
) -> Result<RuntimeApiReceiver<Response>, Error>
	RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
	FromJob: TryFrom<AllMessages>,
	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
	let (tx, rx) = oneshot::channel();

	sender
		.send(
			AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
				.try_into()
				.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
		)
		.await?;

	Ok(rx)
}

/// Request a validator set from the `RuntimeApi`.
pub async fn request_validators<FromJob>(
	parent: Hash,
	s: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<Vec<ValidatorId>>, Error>
where
	FromJob: TryFrom<AllMessages>,
	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
	request_from_runtime(parent, s, |tx| RuntimeApiRequest::Validators(tx)).await
}

pub async fn request_validator_groups<FromJob>(
	parent: Hash,
	s: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>, Error>
where
	FromJob: TryFrom<AllMessages>,
	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
	request_from_runtime(parent, s, |tx| RuntimeApiRequest::ValidatorGroups(tx)).await
}

/// Request the session index of the child block.
pub async fn request_session_index_for_child<FromJob>(
	parent: Hash,
	s: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<SessionIndex>, Error>
where
	FromJob: TryFrom<AllMessages>,
	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
	request_from_runtime(parent, s, |tx| {
		RuntimeApiRequest::SessionIndexForChild(tx)
	}).await
/// Request all availability cores
pub async fn request_availability_cores<FromJob>(
	parent: Hash,
	s: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<Vec<CoreState>>, Error>
where
	FromJob: TryFrom<AllMessages>,
	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
	request_from_runtime(parent, s, |tx| {
		RuntimeApiRequest::AvailabilityCores(tx)
	}).await
}

/// Request global validation data.
pub async fn request_global_validation_data<FromJob>(
	parent: Hash,
	s: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<GlobalValidationData>, Error>
where
	FromJob: TryFrom<AllMessages>,
	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
	request_from_runtime(parent, s, |tx| {
		RuntimeApiRequest::GlobalValidationData(tx)
	}).await
}

/// Request local validation data.
pub async fn request_local_validation_Data<FromJob>(
	parent: Hash,
	para_id: ParaId,
	assumption: OccupiedCoreAssumption,
	s: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<LocalValidationData>, Error>
where
	FromJob: TryFrom<AllMessages>,
	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
Loading full blame...