// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
//! # Overseer
//!
//! `overseer` implements the Overseer architecture described in the
//! [implementers-guide](https://github.com/paritytech/polkadot/blob/master/roadmap/implementers-guide/guide.md).
//! For the motivations behind implementing the overseer itself you should
//! check out that guide, documentation in this crate will be mostly discussing
//! technical stuff.
//!
//! An `Overseer` is something that allows spawning/stopping and overseing
//! asynchronous tasks as well as establishing a well-defined and easy to use
//! protocol that the tasks can use to communicate with each other. It is desired
//! that this protocol is the only way tasks communicate with each other, however
//! at this moment there are no foolproof guards against other ways of communication.
//!
//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
//! share the same behavior from `Overseer`'s point of view.
//!
//! ```text
//! +-----------------------------+
//! | Overseer |
//! +-----------------------------+
//!
//! ................| Overseer "holds" these and uses |..............
//! . them to (re)start things .
//! . .
//! . +-------------------+ +---------------------+ .
//! . | Subsystem1 | | Subsystem2 | .
//! . +-------------------+ +---------------------+ .
//! . | | .
//! ..................................................................
//! | |
//! start() start()
//! V V
//! ..................| Overseer "runs" these |.......................
//! . +--------------------+ +---------------------+ .
//! . | SubsystemInstance1 | | SubsystemInstance2 | .
//! . +--------------------+ +---------------------+ .
//! ..................................................................
//! ```
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use std::collections::HashSet;
use futures::channel::{mpsc, oneshot};
use futures::{
pending, poll, select,
future::BoxFuture,
stream::{self, FuturesUnordered},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use streamunordered::{StreamYield, StreamUnordered};
use polkadot_primitives::v1::{Block, BlockNumber, Hash};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_subsystem::messages::{
CandidateValidationMessage, CandidateBackingMessage,
CandidateSelectionMessage, StatementDistributionMessage,
AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage,
ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage,
AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages,
};
pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem, ActiveLeavesUpdate,
};
use polkadot_node_primitives::SpawnNamed;
// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
// A graceful `Overseer` teardown time delay.
const STOP_DELAY: u64 = 1;
/// A type of messages that are sent from [`Subsystem`] to [`Overseer`].
///
/// It wraps a system-wide [`AllMessages`] type that represents all possible
/// messages in the system.
///
/// [`AllMessages`]: enum.AllMessages.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`Overseer`]: struct.Overseer.html
enum ToOverseer {
/// This is a message sent by a `Subsystem`.
SubsystemMessage(AllMessages),
/// A message that wraps something the `Subsystem` is desiring to
/// spawn on the overseer and a `oneshot::Sender` to signal the result
/// of the spawn.
SpawnJob {
name: &'static str,
s: BoxFuture<'static, ()>,
},
}
/// An event telling the `Overseer` on the particular block
/// that has been imported or finalized.
///
/// This structure exists solely for the purposes of decoupling
/// `Overseer` code from the client code and the necessity to call
/// `HeaderBackend::block_number_from_id()`.
pub struct BlockInfo {
/// hash of the block.
pub hash: Hash,
/// hash of the parent block.
pub parent_hash: Hash,
/// block's number.
pub number: BlockNumber,
}
impl From> for BlockInfo {
fn from(n: BlockImportNotification) -> Self {
BlockInfo {
hash: n.hash,
parent_hash: n.header.parent_hash,
number: n.header.number,
}
}
}
impl From> for BlockInfo {
fn from(n: FinalityNotification) -> Self {
BlockInfo {
hash: n.hash,
parent_hash: n.header.parent_hash,
number: n.header.number,
}
}
}
/// Some event from outer world.
enum Event {
BlockImported(BlockInfo),
BlockFinalized(BlockInfo),
MsgToSubsystem(AllMessages),
Stop,
}
/// A handler used to communicate with the [`Overseer`].
///
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub struct OverseerHandler {
events_tx: mpsc::Sender,
}
impl OverseerHandler {
/// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImported(block)).await?;
Ok(())
}
/// Send some message to one of the `Subsystem`s.
pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.events_tx.send(Event::MsgToSubsystem(msg)).await?;
Ok(())
}
/// Inform the `Overseer` that that some block was finalized.
pub async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockFinalized(block)).await?;
Ok(())
}
/// Tell `Overseer` to shutdown.
pub async fn stop(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::Stop).await?;
Ok(())
}
}
/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
/// import and finality notifications into the [`OverseerHandler`].
///
/// [`Overseer`]: struct.Overseer.html
/// [`OverseerHandler`]: struct.OverseerHandler.html
pub async fn forward_events>(
client: Arc
,
mut handler: OverseerHandler,
) -> SubsystemResult<()> {
let mut finality = client.finality_notification_stream();
let mut imports = client.import_notification_stream();
loop {
select! {
f = finality.next() => {
match f {
Some(block) => {
handler.block_finalized(block.into()).await?;
}
None => break,
}
},
i = imports.next() => {
match i {
Some(block) => {
handler.block_imported(block.into()).await?;
}
None => break,
}
},
complete => break,
}
}
Ok(())
}
impl Debug for ToOverseer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ToOverseer::SubsystemMessage(msg) => {
write!(f, "OverseerMessage::SubsystemMessage({:?})", msg)
}
ToOverseer::SpawnJob { .. } => write!(f, "OverseerMessage::Spawn(..)")
}
}
}
/// A running instance of some [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance {
tx: mpsc::Sender>,
}
/// A context type that is given to the [`Subsystem`] upon spawning.
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
/// or to spawn it's [`SubsystemJob`]s.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
#[derive(Debug)]
pub struct OverseerSubsystemContext{
rx: mpsc::Receiver>,
tx: mpsc::Sender,
}
#[async_trait::async_trait]
impl SubsystemContext for OverseerSubsystemContext {
type Message = M;
async fn try_recv(&mut self) -> Result