Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! # Overseer
//!
//! `overseer` implements the Overseer architecture described in the
//! [implementers-guide](https://github.com/paritytech/polkadot/blob/master/roadmap/implementers-guide/guide.md).
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//! For the motivations behind implementing the overseer itself you should
//! check out that guide, documentation in this crate will be mostly discussing
//! technical stuff.
//!
//! An `Overseer` is something that allows spawning/stopping and overseing
//! asynchronous tasks as well as establishing a well-defined and easy to use
//! protocol that the tasks can use to communicate with each other. It is desired
//! that this protocol is the only way tasks communicate with each other, however
//! at this moment there are no foolproof guards against other ways of communication.
//!
//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
//! share the same behavior from `Overseer`'s point of view.
//!
//! ```text
//! +-----------------------------+
//! | Overseer |
//! +-----------------------------+
//!
//! ................| Overseer "holds" these and uses |..............
//! . them to (re)start things .
//! . .
//! . +-------------------+ +---------------------+ .
//! . | Subsystem1 | | Subsystem2 | .
//! . +-------------------+ +---------------------+ .
//! . | | .
//! ..................................................................
//! | |
//! start() start()
//! V V
//! ..................| Overseer "runs" these |.......................
//! . +--------------------+ +---------------------+ .
//! . | SubsystemInstance1 | | SubsystemInstance2 | .
//! . +--------------------+ +---------------------+ .
//! ..................................................................
//! ```
// #![deny(unused_results)]
// unused dependencies can not work for test and examples at the same time
// yielding false positives
#![warn(missing_docs)]
use std::collections::{hash_map, HashMap};
use futures::channel::{mpsc, oneshot};
use futures::{
pending, poll, select,
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use streamunordered::{StreamYield, StreamUnordered};
use polkadot_primitives::v1::{Block, BlockNumber, Hash};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
CandidateValidationMessage, CandidateBackingMessage,
CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage,
AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage,
ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage,
AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, CollatorProtocolMessage,
};
pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem,
asynchronous rob
committed
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_primitives::SpawnNamed;
asynchronous rob
committed
// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
// A graceful `Overseer` teardown time delay.
const STOP_DELAY: u64 = 1;
// Target for logs.
const LOG_TARGET: &'static str = "overseer";
/// A type of messages that are sent from [`Subsystem`] to [`Overseer`].
///
/// It wraps a system-wide [`AllMessages`] type that represents all possible
/// messages in the system.
///
/// [`AllMessages`]: enum.AllMessages.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`Overseer`]: struct.Overseer.html
enum ToOverseer {
/// This is a message sent by a `Subsystem`.
SubsystemMessage(AllMessages),
/// A message that wraps something the `Subsystem` is desiring to
/// spawn on the overseer and a `oneshot::Sender` to signal the result
/// of the spawn.
SpawnJob {
name: &'static str,
/// Same as `SpawnJob` but for blocking tasks to be executed on a
/// dedicated thread pool.
SpawnBlockingJob {
name: &'static str,
s: BoxFuture<'static, ()>,
},
/// An event telling the `Overseer` on the particular block
/// that has been imported or finalized.
///
/// This structure exists solely for the purposes of decoupling
/// `Overseer` code from the client code and the necessity to call
/// `HeaderBackend::block_number_from_id()`.
pub struct BlockInfo {
/// hash of the block.
pub hash: Hash,
/// hash of the parent block.
pub parent_hash: Hash,
/// block's number.
pub number: BlockNumber,
}
impl From<BlockImportNotification<Block>> for BlockInfo {
fn from(n: BlockImportNotification<Block>) -> Self {
BlockInfo {
hash: n.hash,
parent_hash: n.header.parent_hash,
number: n.header.number,
}
}
}
impl From<FinalityNotification<Block>> for BlockInfo {
fn from(n: FinalityNotification<Block>) -> Self {
BlockInfo {
hash: n.hash,
parent_hash: n.header.parent_hash,
number: n.header.number,
}
}
}
/// Some event from the outer world.
BlockImported(BlockInfo),
BlockFinalized(BlockInfo),
ExternalRequest(ExternalRequest),
/// Some request from outer world.
enum ExternalRequest {
WaitForActivation {
hash: Hash,
response_channel: oneshot::Sender<SubsystemResult<()>>,
/// A handler used to communicate with the [`Overseer`].
///
/// [`Overseer`]: struct.Overseer.html
pub struct OverseerHandler {
events_tx: mpsc::Sender<Event>,
}
impl OverseerHandler {
/// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImported(block)).await.map_err(Into::into)
}
/// Send some message to one of the `Subsystem`s.
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) -> SubsystemResult<()> {
self.events_tx.send(Event::MsgToSubsystem(msg.into())).await.map_err(Into::into)
Loading full blame...