// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! # Overseer
//!
//! `overseer` implements the Overseer architecture described in the
//! [implementors-guide](https://github.com/paritytech/polkadot/blob/master/roadmap/implementors-guide/guide.md).
//! For the motivations behind implementing the overseer itself you should
//! check out that guide, documentation in this crate will be mostly discussing
//! technical stuff.
//!
//! An `Overseer` is something that allows spawning/stopping and overseing
//! asynchronous tasks as well as establishing a well-defined and easy to use
//! protocol that the tasks can use to communicate with each other. It is desired
//! that this protocol is the only way tasks communicate with each other, however
//! at this moment there are no foolproof guards against other ways of communication.
//!
//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
//! share the same behavior from `Overseer`'s point of view.
//!
//! ```text
//! +-----------------------------+
//! | Overseer |
//! +-----------------------------+
//!
//! ................| Overseer "holds" these and uses |..............
//! . them to (re)start things .
//! . .
//! . +-------------------+ +---------------------+ .
//! . | Subsystem1 | | Subsystem2 | .
//! . +-------------------+ +---------------------+ .
//! . | | .
//! ..................................................................
//! | |
//! start() start()
//! V V
//! ..................| Overseer "runs" these |.......................
//! . +--------------------+ +---------------------+ .
//! . | SubsystemInstance1 | | SubsystemInstance2 | .
//! . +--------------------+ +---------------------+ .
//! ..................................................................
//! ```
use std::fmt::Debug;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use futures::channel::{mpsc, oneshot};
use futures::{
pending, poll, select,
future::{BoxFuture, RemoteHandle},
stream::FuturesUnordered,
task::{Spawn, SpawnError, SpawnExt},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use streamunordered::{StreamYield, StreamUnordered};
/// An error type that describes faults that may happen
///
/// These are:
/// * Channels being closed
/// * Subsystems dying when they are not expected to
/// * Subsystems not dying when they are told to die
/// * etc.
#[derive(Debug)]
pub struct SubsystemError;
impl From for SubsystemError {
fn from(_: mpsc::SendError) -> Self {
Self
}
}
impl From for SubsystemError {
fn from(_: oneshot::Canceled) -> Self {
Self
}
}
impl From for SubsystemError {
fn from(_: SpawnError) -> Self {
Self
}
}
/// A `Result` type that wraps [`SubsystemError`].
///
/// [`SubsystemError`]: struct.SubsystemError.html
pub type SubsystemResult = Result;
/// An asynchronous subsystem task that runs inside and being overseen by the [`Overseer`].
///
/// In essence it's just a newtype wrapping a `BoxFuture`.
///
/// [`Overseer`]: struct.Overseer.html
pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>);
// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
// A graceful `Overseer` teardown time delay.
const STOP_DELAY: u64 = 1;
/// A type of messages that are sent from [`Subsystem`] to [`Overseer`].
///
/// It wraps a system-wide [`AllMessages`] type that represents all possible
/// messages in the system.
///
/// [`AllMessages`]: enum.AllMessages.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`Overseer`]: struct.Overseer.html
enum ToOverseer {
/// This is a message sent by a `Subsystem`.
SubsystemMessage(AllMessages),
/// A message that wraps something the `Subsystem` is desiring to
/// spawn on the overseer and a `oneshot::Sender` to signal the result
/// of the spawn.
SpawnJob {
s: BoxFuture<'static, ()>,
res: oneshot::Sender>,
},
}
/// Some event from outer world.
enum Event {
BlockImport,
BlockFinalized,
MsgToSubsystem(AllMessages),
Stop,
}
/// Some message that is sent from one of the `Subsystem`s to the outside world.
pub enum OutboundMessage {
SubsystemMessage {
msg: AllMessages,
}
}
/// A handler used to communicate with the [`Overseer`].
///
/// [`Overseer`]: struct.Overseer.html
pub struct OverseerHandler {
events_tx: mpsc::Sender,
}
impl OverseerHandler {
/// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImport).await?;
Ok(())
}
/// Send some message to one of the `Subsystem`s.
pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.events_tx.send(Event::MsgToSubsystem(msg)).await?;
Ok(())
}
/// Inform the `Overseer` that that some block was finalized.
pub async fn block_finalized(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockFinalized).await?;
Ok(())
}
/// Tell `Overseer` to shutdown.
pub async fn stop(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::Stop).await?;
Ok(())
}
}
impl Debug for ToOverseer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ToOverseer::SubsystemMessage(msg) => {
write!(f, "OverseerMessage::SubsystemMessage({:?})", msg)
}
ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)")
}
}
}
/// A running instance of some [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance {
tx: mpsc::Sender>,
}
/// A context type that is given to the [`Subsystem`] upon spawning.
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
/// or to spawn it's [`SubsystemJob`]s.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
pub struct SubsystemContext{
rx: mpsc::Receiver>,
tx: mpsc::Sender,
}
/// A signal used by [`Overseer`] to communicate with the [`Subsystem`]s.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
#[derive(Debug)]
pub enum OverseerSignal {
/// `Subsystem` should start working.
StartWork,
/// `Subsystem` should stop working.
StopWork,
/// Conclude the work of the `Overseer` and all `Subsystem`s.
Conclude,
}
#[derive(Debug)]
/// A message type used by the Validation [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
pub enum ValidationSubsystemMessage {
ValidityAttestation,
}
#[derive(Debug)]
/// A message type used by the CandidateBacking [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
pub enum CandidateBackingSubsystemMessage {
RegisterBackingWatcher,
Second,
}
/// A message type tying together all message types that are used across [`Subsystem`]s.
///
/// [`Subsystem`]: trait.Subsystem.html
#[derive(Debug)]
pub enum AllMessages {
Validation(ValidationSubsystemMessage),
CandidateBacking(CandidateBackingSubsystemMessage),
}
/// A message type that a [`Subsystem`] receives from the [`Overseer`].
/// It wraps siglans from the [`Overseer`] and messages that are circulating
/// between subsystems.
///
/// It is generic over over the message type `M` that a particular `Subsystem` may use.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
#[derive(Debug)]
pub enum FromOverseer {
/// Signal from the `Overseer`.
Signal(OverseerSignal),
/// Some other `Subsystem`'s message.
Communication {
msg: M,
},
}
impl SubsystemContext {
/// Try to asyncronously receive a message.
///
/// This has to be used with caution, if you loop over this without
/// using `pending!()` macro you will end up with a busy loop!
pub async fn try_recv(&mut self) -> Result