lib.rs 8.25 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Subsystem trait definitions and message types.
//!
//! Node-side logic for Polkadot is mostly comprised of Subsystems, which are discrete components
//! that communicate via message-passing. They are coordinated by an overseer, provided by a
//! separate crate.

23
24
#![warn(missing_docs)]

25
26
27
28
29
30
use std::pin::Pin;

use futures::prelude::*;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;

asynchronous rob's avatar
asynchronous rob committed
31
use polkadot_primitives::v1::Hash;
32
use async_trait::async_trait;
33
use smallvec::SmallVec;
34
use thiserror::Error;
35
36
37

use crate::messages::AllMessages;

Andronik Ordian's avatar
Andronik Ordian committed
38
pub mod errors;
39
40
pub mod messages;

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/// How many slots are stack-reserved for active leaves updates
///
/// If there are fewer than this number of slots, then we've wasted some stack space.
/// If there are greater than this number of slots, then we fall back to a heap vector.
const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8;

/// Changes in the set of active leaves: the parachain heads which we care to work on.
///
/// Note that the activated and deactivated fields indicate deltas, not complete sets.
#[derive(Clone, Debug, Default, Eq)]
pub struct ActiveLeavesUpdate {
	/// New relay chain block hashes of interest.
	pub activated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
	/// Relay chain block hashes no longer of interest.
	pub deactivated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
}

impl ActiveLeavesUpdate {
	/// Create a ActiveLeavesUpdate with a single activated hash
	pub fn start_work(hash: Hash) -> Self {
		Self { activated: [hash].as_ref().into(), ..Default::default() }
	}

	/// Create a ActiveLeavesUpdate with a single deactivated hash
	pub fn stop_work(hash: Hash) -> Self {
		Self { deactivated: [hash].as_ref().into(), ..Default::default() }
	}
}

impl PartialEq for ActiveLeavesUpdate {
	/// Equality for `ActiveLeavesUpdate` doesnt imply bitwise equality.
	///
	/// Instead, it means equality when `activated` and `deactivated` are considered as sets.
	fn eq(&self, other: &Self) -> bool {
		use std::collections::HashSet;
		self.activated.iter().collect::<HashSet<_>>() == other.activated.iter().collect::<HashSet<_>>() &&
			self.deactivated.iter().collect::<HashSet<_>>() == other.deactivated.iter().collect::<HashSet<_>>()
	}
}

81
82
83
/// Signals sent by an overseer to a subsystem.
#[derive(PartialEq, Clone, Debug)]
pub enum OverseerSignal {
84
85
	/// Subsystems should adjust their jobs to start and stop work on appropriate block hashes.
	ActiveLeaves(ActiveLeavesUpdate),
86
87
	/// `Subsystem` is informed of a finalized block by its block hash.
	BlockFinalized(Hash),
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
	/// Conclude the work of the `Overseer` and all `Subsystem`s.
	Conclude,
}

/// A message type that a subsystem receives from an overseer.
/// It wraps signals from an overseer and messages that are circulating
/// between subsystems.
///
/// It is generic over over the message type `M` that a particular `Subsystem` may use.
#[derive(Debug)]
pub enum FromOverseer<M> {
	/// Signal from the `Overseer`.
	Signal(OverseerSignal),

	/// Some other `Subsystem`'s message.
	Communication {
104
		/// Contained message
105
106
107
108
		msg: M,
	},
}

109

110
111
112
113
114
115
116
/// An error type that describes faults that may happen
///
/// These are:
///   * Channels being closed
///   * Subsystems dying when they are not expected to
///   * Subsystems not dying when they are told to die
///   * etc.
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#[derive(Error, Debug)]
pub enum SubsystemError {
	/// A notification connection is no longer valid.
	#[error(transparent)]
	NotifyCancellation(#[from] oneshot::Canceled),

	/// Queue does not accept another item.
	#[error(transparent)]
	QueueError(#[from] mpsc::SendError),

	/// An attempt to spawn a futures task did not succeed.
	#[error(transparent)]
	TaskSpawn(#[from] futures::task::SpawnError),

	/// An infallable error.
	#[error(transparent)]
	Infallible(#[from] std::convert::Infallible),

135
136
137
138
	/// Prometheus had a problem
	#[error(transparent)]
	Prometheus(#[from] substrate_prometheus_endpoint::PrometheusError),

139
140
141
142
143
144
145
146
147
148
149
150
	/// An other error lacking particular type information.
	#[error("Failed to {0}")]
	Context(String),

	/// Per origin (or subsystem) annotations to wrap an error.
	#[error("Error originated in {origin}")]
	FromOrigin {
		/// An additional anotation tag for the origin of `source`.
		origin: &'static str,
		/// The wrapped error. Marked as source for tracking the error chain.
		#[source] source: Box<dyn std::error::Error + Send>
	},
151
152
}

153
154
155
156
impl SubsystemError {
	/// Adds a `str` as `origin` to the given error `err`.
	pub fn with_origin<E: 'static + Send + std::error::Error>(origin: &'static str, err: E) -> Self {
		Self::FromOrigin { origin, source: Box::new(err) }
157
158
159
160
161
162
	}
}

/// An asynchronous subsystem task..
///
/// In essence it's just a newtype wrapping a `BoxFuture`.
163
164
165
166
pub struct SpawnedSubsystem {
	/// Name of the subsystem being spawned.
	pub name: &'static str,
	/// The task of the subsystem being spawned.
167
	pub future: BoxFuture<'static, SubsystemResult<()>>,
168
}
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

/// A `Result` type that wraps [`SubsystemError`].
///
/// [`SubsystemError`]: struct.SubsystemError.html
pub type SubsystemResult<T> = Result<T, SubsystemError>;

/// A context type that is given to the [`Subsystem`] upon spawning.
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
/// or spawn jobs.
///
/// [`Overseer`]: struct.Overseer.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
#[async_trait]
pub trait SubsystemContext: Send + 'static {
	/// The message type of this context. Subsystems launched with this context will expect
	/// to receive messages of this type.
	type Message: Send;

	/// Try to asynchronously receive a message.
	///
	/// This has to be used with caution, if you loop over this without
	/// using `pending!()` macro you will end up with a busy loop!
	async fn try_recv(&mut self) -> Result<Option<FromOverseer<Self::Message>>, ()>;

	/// Receive a message.
	async fn recv(&mut self) -> SubsystemResult<FromOverseer<Self::Message>>;

	/// Spawn a child task on the executor.
197
	async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()>;
198

199
200
201
202
203
204
205
	/// Spawn a blocking child task on the executor's dedicated thread pool.
	async fn spawn_blocking(
		&mut self,
		name: &'static str,
		s: Pin<Box<dyn Future<Output = ()> + Send>>,
	) -> SubsystemResult<()>;

206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
	/// Send a direct message to some other `Subsystem`, routed based on message type.
	async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>;

	/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
	async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
		where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send;
}

/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
///
/// It is generic over the message type circulating in the system.
/// The idea that we want some type contaning persistent state that
/// can spawn actually running subsystems when asked to.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
pub trait Subsystem<C: SubsystemContext> {
	/// Start this `Subsystem` and return `SpawnedSubsystem`.
224
	fn start(self, ctx: C) -> SpawnedSubsystem;
225
}
226
227
228
229
230
231
232

/// A dummy subsystem that implements [`Subsystem`] for all
/// types of messages. Used for tests or as a placeholder.
pub struct DummySubsystem;

impl<C: SubsystemContext> Subsystem<C> for DummySubsystem {
	fn start(self, mut ctx: C) -> SpawnedSubsystem {
233
		let future = Box::pin(async move {
234
235
			loop {
				match ctx.recv().await {
236
237
					Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
					Err(_) => return Ok(()),
238
239
240
					_ => continue,
				}
			}
241
242
243
		});

		SpawnedSubsystem {
244
			name: "dummy-subsystem",
245
246
			future,
		}
247
	}
248
}