lib.rs 30.6 KB
Newer Older
Fedor Sakharov's avatar
Fedor Sakharov committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! # Overseer
//!
//! `overseer` implements the Overseer architecture described in the
20
//! [implementers-guide](https://w3f.github.io/parachain-implementers-guide/node/index.html).
Fedor Sakharov's avatar
Fedor Sakharov committed
21
22
23
24
//! For the motivations behind implementing the overseer itself you should
//! check out that guide, documentation in this crate will be mostly discussing
//! technical stuff.
//!
25
//! An `Overseer` is something that allows spawning/stopping and overseeing
Fedor Sakharov's avatar
Fedor Sakharov committed
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//! asynchronous tasks as well as establishing a well-defined and easy to use
//! protocol that the tasks can use to communicate with each other. It is desired
//! that this protocol is the only way tasks communicate with each other, however
//! at this moment there are no foolproof guards against other ways of communication.
//!
//! The `Overseer` is instantiated with a pre-defined set of `Subsystems` that
//! share the same behavior from `Overseer`'s point of view.
//!
//! ```text
//!                              +-----------------------------+
//!                              |         Overseer            |
//!                              +-----------------------------+
//!
//!             ................|  Overseer "holds" these and uses |..............
//!             .                  them to (re)start things                      .
//!             .                                                                .
//!             .  +-------------------+                +---------------------+  .
//!             .  |   Subsystem1      |                |   Subsystem2        |  .
//!             .  +-------------------+                +---------------------+  .
//!             .           |                                       |            .
//!             ..................................................................
//!                         |                                       |
//!                       start()                                 start()
//!                         V                                       V
//!             ..................| Overseer "runs" these |.......................
//!             .  +--------------------+               +---------------------+  .
//!             .  | SubsystemInstance1 |               | SubsystemInstance2  |  .
//!             .  +--------------------+               +---------------------+  .
//!             ..................................................................
//! ```

57
58
59
60
61
// #![deny(unused_results)]
// unused dependencies can not work for test and examples at the same time
// yielding false positives
#![warn(missing_docs)]

Shawn Tabrizi's avatar
Shawn Tabrizi committed
62
63
64
65
66
67
68
use std::{
	collections::{hash_map, HashMap},
	fmt::{self, Debug},
	iter::FromIterator,
	pin::Pin,
	sync::Arc,
	time::Duration,
Fedor Sakharov's avatar
Fedor Sakharov committed
69
};
Shawn Tabrizi's avatar
Shawn Tabrizi committed
70
71

use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
72
use lru::LruCache;
Andronik Ordian's avatar
Andronik Ordian committed
73
use parking_lot::RwLock;
Fedor Sakharov's avatar
Fedor Sakharov committed
74

75
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
Shawn Tabrizi's avatar
Shawn Tabrizi committed
76
use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost};
77
use sp_api::{ApiExt, ProvideRuntimeApi};
78

Shawn Tabrizi's avatar
Shawn Tabrizi committed
79
use polkadot_node_network_protocol::v1 as protocol_v1;
80
use polkadot_node_subsystem_types::messages::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
81
82
83
84
85
86
87
	ApprovalDistributionMessage, ApprovalVotingMessage, AvailabilityDistributionMessage,
	AvailabilityRecoveryMessage, AvailabilityStoreMessage, BitfieldDistributionMessage,
	BitfieldSigningMessage, CandidateBackingMessage, CandidateValidationMessage, ChainApiMessage,
	ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
	DisputeCoordinatorMessage, DisputeDistributionMessage, DisputeParticipationMessage,
	GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeMessage, ProvisionerMessage,
	RuntimeApiMessage, StatementDistributionMessage,
88
};
89
pub use polkadot_node_subsystem_types::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
90
91
	errors::{SubsystemError, SubsystemResult},
	jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
92
};
Andronik Ordian's avatar
Andronik Ordian committed
93

94
95
96
97
// TODO legacy, to be deleted, left for easier integration
// TODO https://github.com/paritytech/polkadot/issues/3427
mod subsystems;
pub use self::subsystems::AllSubsystems;
98

99
100
mod metrics;
use self::metrics::Metrics;
101

102
use polkadot_node_metrics::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
103
	metrics::{prometheus, Metrics as MetricsTrait},
104
105
	Metronome,
};
106
107
108
109

#[cfg(feature = "memory-stats")]
use polkadot_node_metrics::memory_stats::MemoryAllocationTracker;

Shawn Tabrizi's avatar
Shawn Tabrizi committed
110
pub use polkadot_overseer_gen as gen;
111
pub use polkadot_overseer_gen::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
112
113
114
	overlord, FromOverseer, MapSubsystem, MessagePacket, SignalsReceived, SpawnNamed, Subsystem,
	SubsystemContext, SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts,
	SubsystemMeters, SubsystemSender, TimeoutExt, ToOverseer,
115
};
116

117
118
119
/// Store 2 days worth of blocks, not accounting for forks,
/// in the LRU cache. Assumes a 6-second block time.
const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
Fedor Sakharov's avatar
Fedor Sakharov committed
120

121
122
#[cfg(test)]
mod tests;
123

124
125
126
127
128
129
/// Whether a header supports parachain consensus or not.
pub trait HeadSupportsParachains {
	/// Return true if the given header supports parachain consensus. Otherwise, false.
	fn head_supports_parachains(&self, head: &Hash) -> bool;
}

Shawn Tabrizi's avatar
Shawn Tabrizi committed
130
131
impl<Client> HeadSupportsParachains for Arc<Client>
where
132
133
134
135
136
137
138
139
140
	Client: ProvideRuntimeApi<Block>,
	Client::Api: ParachainHost<Block>,
{
	fn head_supports_parachains(&self, head: &Hash) -> bool {
		let id = BlockId::Hash(*head);
		self.runtime_api().has_api::<dyn ParachainHost<Block>>(&id).unwrap_or(false)
	}
}

Andronik Ordian's avatar
Andronik Ordian committed
141
/// A handle used to communicate with the [`Overseer`].
142
///
143
144
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
Andronik Ordian's avatar
Andronik Ordian committed
145
146
147
148
149
150
151
pub enum Handle {
	/// Used only at initialization to break the cyclic dependency.
	// TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
	Disconnected(Arc<RwLock<Option<OverseerHandle>>>),
	/// A handle to the overseer.
	Connected(OverseerHandle),
}
152

153
impl Handle {
Andronik Ordian's avatar
Andronik Ordian committed
154
155
156
157
158
	/// Create a new disconnected [`Handle`].
	pub fn new_disconnected() -> Self {
		Self::Disconnected(Arc::new(RwLock::new(None)))
	}

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
	/// Inform the `Overseer` that that some block was imported.
	pub async fn block_imported(&mut self, block: BlockInfo) {
		self.send_and_log_error(Event::BlockImported(block)).await
	}

	/// Send some message to one of the `Subsystem`s.
	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
		self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await
	}

	/// Send a message not providing an origin.
	#[inline(always)]
	pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
		self.send_msg(msg, "").await
	}

	/// Inform the `Overseer` that some block was finalized.
	pub async fn block_finalized(&mut self, block: BlockInfo) {
		self.send_and_log_error(Event::BlockFinalized(block)).await
	}

	/// Wait for a block with the given hash to be in the active-leaves set.
181
	///
182
183
184
185
	/// The response channel responds if the hash was activated and is closed if the hash was deactivated.
	/// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas,
	/// the response channel may never return if the hash was deactivated before this call.
	/// In this case, it's the caller's responsibility to ensure a timeout is set.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
186
187
188
189
190
	pub async fn wait_for_activation(
		&mut self,
		hash: Hash,
		response_channel: oneshot::Sender<SubsystemResult<()>>,
	) {
191
		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
192
193
194
195
			hash,
			response_channel,
		}))
		.await;
196
197
	}

198
199
200
	/// Tell `Overseer` to shutdown.
	pub async fn stop(&mut self) {
		self.send_and_log_error(Event::Stop).await;
201
202
	}

203
204
	/// Most basic operation, to stop a server.
	async fn send_and_log_error(&mut self, event: Event) {
Andronik Ordian's avatar
Andronik Ordian committed
205
206
207
208
209
210
211
		self.try_connect();
		if let Self::Connected(ref mut handle) = self {
			if handle.send(event).await.is_err() {
				tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
			}
		} else {
			tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
212
213
214
		}
	}

Andronik Ordian's avatar
Andronik Ordian committed
215
216
217
218
219
220
	/// Whether the handle is disconnected.
	pub fn is_disconnected(&self) -> bool {
		match self {
			Self::Disconnected(ref x) => x.read().is_none(),
			_ => false,
		}
221
	}
222

Andronik Ordian's avatar
Andronik Ordian committed
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
	/// Connect this handle and all disconnected clones of it to the overseer.
	pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
		match self {
			Self::Disconnected(ref mut x) => {
				let mut maybe_handle = x.write();
				if maybe_handle.is_none() {
					tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer");
					*maybe_handle = Some(handle);
				} else {
					tracing::warn!(
						target: LOG_TARGET,
						"Attempting to connect a clone of a connected Handle",
					);
				}
			},
			_ => {
				tracing::warn!(
					target: LOG_TARGET,
					"Attempting to connect an already connected Handle",
				);
			},
		}
245
	}
246

Andronik Ordian's avatar
Andronik Ordian committed
247
248
249
250
251
252
253
254
255
256
257
	/// Try upgrading from `Self::Disconnected` to `Self::Connected` state
	/// after calling `connect_to_overseer` on `self` or a clone of `self`.
	fn try_connect(&mut self) {
		if let Self::Disconnected(ref mut x) = self {
			let guard = x.write();
			if let Some(ref h) = *guard {
				let handle = h.clone();
				drop(guard);
				*self = Self::Connected(handle);
			}
		}
258
	}
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
}

/// An event telling the `Overseer` on the particular block
/// that has been imported or finalized.
///
/// This structure exists solely for the purposes of decoupling
/// `Overseer` code from the client code and the necessity to call
/// `HeaderBackend::block_number_from_id()`.
#[derive(Debug, Clone)]
pub struct BlockInfo {
	/// hash of the block.
	pub hash: Hash,
	/// hash of the parent block.
	pub parent_hash: Hash,
	/// block's number.
	pub number: BlockNumber,
}

impl From<BlockImportNotification<Block>> for BlockInfo {
	fn from(n: BlockImportNotification<Block>) -> Self {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
279
		BlockInfo { hash: n.hash, parent_hash: n.header.parent_hash, number: n.header.number }
280
281
282
283
284
	}
}

impl From<FinalityNotification<Block>> for BlockInfo {
	fn from(n: FinalityNotification<Block>) -> Self {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
285
		BlockInfo { hash: n.hash, parent_hash: n.header.parent_hash, number: n.header.number }
286
287
288
	}
}

289
290
291
292
/// An event from outside the overseer scope, such
/// as the substrate framework or user interaction.
pub enum Event {
	/// A new block was imported.
293
	BlockImported(BlockInfo),
294
	/// A block was finalized with i.e. babe or another consensus algorithm.
295
	BlockFinalized(BlockInfo),
296
	/// Message as sent to a subsystem.
297
	MsgToSubsystem {
298
		/// The actual message.
299
		msg: AllMessages,
300
		/// The originating subsystem name.
301
302
		origin: &'static str,
	},
303
	/// A request from the outer world.
304
	ExternalRequest(ExternalRequest),
305
	/// Stop the overseer on i.e. a UNIX signal.
306
307
308
309
	Stop,
}

/// Some request from outer world.
310
311
312
pub enum ExternalRequest {
	/// Wait for the activation of a particular hash
	/// and be notified by means of the return channel.
313
	WaitForActivation {
314
		/// The relay parent for which activation to wait for.
315
		hash: Hash,
316
		/// Response channel to await on.
317
318
319
320
321
		response_channel: oneshot::Sender<SubsystemResult<()>>,
	},
}

/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
322
/// import and finality notifications into the [`OverseerHandle`].
Shawn Tabrizi's avatar
Shawn Tabrizi committed
323
pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
324
325
326
327
328
329
330
331
	let mut finality = client.finality_notification_stream();
	let mut imports = client.import_notification_stream();

	loop {
		select! {
			f = finality.next() => {
				match f {
					Some(block) => {
Andronik Ordian's avatar
Andronik Ordian committed
332
						handle.block_finalized(block.into()).await;
333
334
335
336
337
338
339
					}
					None => break,
				}
			},
			i = imports.next() => {
				match i {
					Some(block) => {
Andronik Ordian's avatar
Andronik Ordian committed
340
						handle.block_imported(block.into()).await;
341
342
343
344
345
346
347
348
349
					}
					None => break,
				}
			},
			complete => break,
		}
	}
}

350
351
352
353
354
355
356
357
358
359
360
/// The `Overseer` itself.
#[overlord(
	gen=AllMessages,
	event=Event,
	signal=OverseerSignal,
	error=SubsystemError,
	network=NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
)]
pub struct Overseer<SupportsParachains> {
	#[subsystem(no_dispatch, CandidateValidationMessage)]
	candidate_validation: CandidateValidation,
361

362
363
	#[subsystem(no_dispatch, CandidateBackingMessage)]
	candidate_backing: CandidateBacking,
364

365
366
	#[subsystem(StatementDistributionMessage)]
	statement_distribution: StatementDistribution,
367

368
369
	#[subsystem(no_dispatch, AvailabilityDistributionMessage)]
	availability_distribution: AvailabilityDistribution,
370

371
372
	#[subsystem(no_dispatch, AvailabilityRecoveryMessage)]
	availability_recovery: AvailabilityRecovery,
373

374
375
	#[subsystem(blocking, no_dispatch, BitfieldSigningMessage)]
	bitfield_signing: BitfieldSigning,
376

377
378
	#[subsystem(BitfieldDistributionMessage)]
	bitfield_distribution: BitfieldDistribution,
379

380
381
	#[subsystem(no_dispatch, ProvisionerMessage)]
	provisioner: Provisioner,
382

383
384
	#[subsystem(no_dispatch, blocking, RuntimeApiMessage)]
	runtime_api: RuntimeApi,
385

386
387
	#[subsystem(no_dispatch, blocking, AvailabilityStoreMessage)]
	availability_store: AvailabilityStore,
388

389
390
	#[subsystem(no_dispatch, NetworkBridgeMessage)]
	network_bridge: NetworkBridge,
391

392
393
	#[subsystem(no_dispatch, blocking, ChainApiMessage)]
	chain_api: ChainApi,
394

395
396
	#[subsystem(no_dispatch, CollationGenerationMessage)]
	collation_generation: CollationGeneration,
397

398
399
	#[subsystem(no_dispatch, CollatorProtocolMessage)]
	collator_protocol: CollatorProtocol,
400

401
402
	#[subsystem(ApprovalDistributionMessage)]
	approval_distribution: ApprovalDistribution,
403

404
405
	#[subsystem(no_dispatch, ApprovalVotingMessage)]
	approval_voting: ApprovalVoting,
406

407
408
	#[subsystem(no_dispatch, GossipSupportMessage)]
	gossip_support: GossipSupport,
409

Andronik Ordian's avatar
Andronik Ordian committed
410
411
	#[subsystem(no_dispatch, DisputeCoordinatorMessage)]
	dispute_coordinator: DisputeCoordinator,
412

Andronik Ordian's avatar
Andronik Ordian committed
413
	#[subsystem(no_dispatch, DisputeParticipationMessage)]
414
	dispute_participation: DisputeParticipation,
415

Andronik Ordian's avatar
Andronik Ordian committed
416
417
	#[subsystem(no_dispatch, DisputeDistributionMessage)]
	dispute_distribution: DisputeDistribution,
418

Andronik Ordian's avatar
Andronik Ordian committed
419
	#[subsystem(no_dispatch, ChainSelectionMessage)]
420
	chain_selection: ChainSelection,
421
422

	/// External listeners waiting for a hash to be in the active-leave set.
423
	pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
424
425

	/// Stores the [`jaeger::Span`] per active leaf.
426
	pub span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>,
427
428
429
430

	/// A set of leaves that `Overseer` starts working with.
	///
	/// Drained at the beginning of `run` and never used again.
431
	pub leaves: Vec<(Hash, BlockNumber)>,
432
433

	/// The set of the "active leaves".
434
	pub active_leaves: HashMap<Hash, BlockNumber>,
435

436
	/// An implementation for checking whether a header supports parachain consensus.
437
	pub supports_parachains: SupportsParachains,
438

439
	/// An LRU cache for keeping track of relay-chain heads that have already been seen.
440
	pub known_leaves: LruCache<Hash, ()>,
441

442
	/// Various Prometheus metrics.
443
	pub metrics: Metrics,
444
445
}

446
impl<S, SupportsParachains> Overseer<S, SupportsParachains>
Fedor Sakharov's avatar
Fedor Sakharov committed
447
where
448
	SupportsParachains: HeadSupportsParachains,
449
	S: SpawnNamed,
Fedor Sakharov's avatar
Fedor Sakharov committed
450
{
451
452
	/// Create a new instance of the [`Overseer`] with a fixed set of [`Subsystem`]s.
	///
453
	/// This returns the overseer along with an [`OverseerHandle`] which can
454
455
	/// be used to send messages from external parts of the codebase.
	///
Andronik Ordian's avatar
Andronik Ordian committed
456
	/// The [`OverseerHandle`] returned from this function is connected to
457
	/// the returned [`Overseer`].
Fedor Sakharov's avatar
Fedor Sakharov committed
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
	///
	/// ```text
	///                  +------------------------------------+
	///                  |            Overseer                |
	///                  +------------------------------------+
	///                    /            |             |      \
	///      ................. subsystems...................................
	///      . +-----------+    +-----------+   +----------+   +---------+ .
	///      . |           |    |           |   |          |   |         | .
	///      . +-----------+    +-----------+   +----------+   +---------+ .
	///      ...............................................................
	///                              |
	///                        probably `spawn`
	///                            a `job`
	///                              |
	///                              V
	///                         +-----------+
	///                         |           |
	///                         +-----------+
	///
	/// ```
	///
	/// [`Subsystem`]: trait.Subsystem.html
	///
	/// # Example
	///
	/// The [`Subsystems`] may be any type as long as they implement an expected interface.
485
486
	/// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them.
	/// For the sake of simplicity the termination of the example is done with a timeout.
Fedor Sakharov's avatar
Fedor Sakharov committed
487
488
489
490
	/// ```
	/// # use std::time::Duration;
	/// # use futures::{executor, pin_mut, select, FutureExt};
	/// # use futures_timer::Delay;
491
	/// # use polkadot_primitives::v1::Hash;
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
	/// # use polkadot_overseer::{
	/// # 	self as overseer,
	/// #   OverseerSignal,
	/// # 	SubsystemSender as _,
	/// # 	AllMessages,
	/// # 	AllSubsystems,
	/// # 	HeadSupportsParachains,
	/// # 	Overseer,
	/// # 	SubsystemError,
	/// # 	gen::{
	/// # 		SubsystemContext,
	/// # 		FromOverseer,
	/// # 		SpawnedSubsystem,
	/// # 	},
	/// # };
	/// # use polkadot_node_subsystem_types::messages::{
	/// # 	CandidateValidationMessage, CandidateBackingMessage,
	/// # 	NetworkBridgeMessage,
Fedor Sakharov's avatar
Fedor Sakharov committed
510
511
512
	/// # };
	///
	/// struct ValidationSubsystem;
513
	///
514
515
516
517
518
519
520
521
	/// impl<Ctx> overseer::Subsystem<Ctx, SubsystemError> for ValidationSubsystem
	/// where
	///     Ctx: overseer::SubsystemContext<
	///				Message=CandidateValidationMessage,
	///				AllMessages=AllMessages,
	///				Signal=OverseerSignal,
	///				Error=SubsystemError,
	///			>,
522
	/// {
Fedor Sakharov's avatar
Fedor Sakharov committed
523
	///     fn start(
524
	///         self,
525
526
	///         mut ctx: Ctx,
	///     ) -> SpawnedSubsystem<SubsystemError> {
527
528
529
530
531
532
533
534
	///         SpawnedSubsystem {
	///             name: "validation-subsystem",
	///             future: Box::pin(async move {
	///                 loop {
	///                     Delay::new(Duration::from_secs(1)).await;
	///                 }
	///             }),
	///         }
Fedor Sakharov's avatar
Fedor Sakharov committed
535
536
537
538
	///     }
	/// }
	///
	/// # fn main() { executor::block_on(async move {
539
540
541
542
543
	///
	/// struct AlwaysSupportsParachains;
	/// impl HeadSupportsParachains for AlwaysSupportsParachains {
	///      fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
	/// }
Bastian Köcher's avatar
Bastian Köcher committed
544
	/// let spawner = sp_core::testing::TaskExecutor::new();
545
546
	/// let all_subsystems = AllSubsystems::<()>::dummy()
	///		.replace_candidate_validation(ValidationSubsystem);
Andronik Ordian's avatar
Andronik Ordian committed
547
	/// let (overseer, _handle) = Overseer::new(
548
	///     vec![],
549
	///     all_subsystems,
550
	///     None,
551
	///     AlwaysSupportsParachains,
Fedor Sakharov's avatar
Fedor Sakharov committed
552
553
554
555
556
557
558
559
560
561
562
563
564
565
	///     spawner,
	/// ).unwrap();
	///
	/// let timer = Delay::new(Duration::from_millis(50)).fuse();
	///
	/// let overseer_fut = overseer.run().fuse();
	/// pin_mut!(timer);
	/// pin_mut!(overseer_fut);
	///
	/// select! {
	///     _ = overseer_fut => (),
	///     _ = timer => (),
	/// }
	/// #
566
567
	/// # 	});
	/// # }
Fedor Sakharov's avatar
Fedor Sakharov committed
568
	/// ```
Shawn Tabrizi's avatar
Shawn Tabrizi committed
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
	pub fn new<
		CV,
		CB,
		SD,
		AD,
		AR,
		BS,
		BD,
		P,
		RA,
		AS,
		NB,
		CA,
		CG,
		CP,
		ApD,
		ApV,
		GS,
		DC,
		DP,
		DD,
		CS,
	>(
592
		leaves: impl IntoIterator<Item = BlockInfo>,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
		all_subsystems: AllSubsystems<
			CV,
			CB,
			SD,
			AD,
			AR,
			BS,
			BD,
			P,
			RA,
			AS,
			NB,
			CA,
			CG,
			CP,
			ApD,
			ApV,
			GS,
			DC,
			DP,
			DD,
			CS,
		>,
616
		prometheus_registry: Option<&prometheus::Registry>,
617
		supports_parachains: SupportsParachains,
618
		s: S,
Andronik Ordian's avatar
Andronik Ordian committed
619
	) -> SubsystemResult<(Self, OverseerHandle)>
620
	where
621
622
		CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
		CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>, SubsystemError> + Send,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
623
624
625
626
		SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>, SubsystemError>
			+ Send,
		AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>, SubsystemError>
			+ Send,
627
628
629
630
631
632
633
634
635
636
		AR: Subsystem<OverseerSubsystemContext<AvailabilityRecoveryMessage>, SubsystemError> + Send,
		BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>, SubsystemError> + Send,
		BD: Subsystem<OverseerSubsystemContext<BitfieldDistributionMessage>, SubsystemError> + Send,
		P: Subsystem<OverseerSubsystemContext<ProvisionerMessage>, SubsystemError> + Send,
		RA: Subsystem<OverseerSubsystemContext<RuntimeApiMessage>, SubsystemError> + Send,
		AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>, SubsystemError> + Send,
		NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>, SubsystemError> + Send,
		CA: Subsystem<OverseerSubsystemContext<ChainApiMessage>, SubsystemError> + Send,
		CG: Subsystem<OverseerSubsystemContext<CollationGenerationMessage>, SubsystemError> + Send,
		CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>, SubsystemError> + Send,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
637
638
		ApD:
			Subsystem<OverseerSubsystemContext<ApprovalDistributionMessage>, SubsystemError> + Send,
639
640
		ApV: Subsystem<OverseerSubsystemContext<ApprovalVotingMessage>, SubsystemError> + Send,
		GS: Subsystem<OverseerSubsystemContext<GossipSupportMessage>, SubsystemError> + Send,
Andronik Ordian's avatar
Andronik Ordian committed
641
642
643
644
		DC: Subsystem<OverseerSubsystemContext<DisputeCoordinatorMessage>, SubsystemError> + Send,
		DP: Subsystem<OverseerSubsystemContext<DisputeParticipationMessage>, SubsystemError> + Send,
		DD: Subsystem<OverseerSubsystemContext<DisputeDistributionMessage>, SubsystemError> + Send,
		CS: Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError> + Send,
645
		S: SpawnNamed,
646
	{
647
648
		let metrics: Metrics = <Metrics as MetricsTrait>::register(prometheus_registry)?;

Andronik Ordian's avatar
Andronik Ordian committed
649
		let (mut overseer, handle) = Self::builder()
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
			.candidate_validation(all_subsystems.candidate_validation)
			.candidate_backing(all_subsystems.candidate_backing)
			.statement_distribution(all_subsystems.statement_distribution)
			.availability_distribution(all_subsystems.availability_distribution)
			.availability_recovery(all_subsystems.availability_recovery)
			.bitfield_signing(all_subsystems.bitfield_signing)
			.bitfield_distribution(all_subsystems.bitfield_distribution)
			.provisioner(all_subsystems.provisioner)
			.runtime_api(all_subsystems.runtime_api)
			.availability_store(all_subsystems.availability_store)
			.network_bridge(all_subsystems.network_bridge)
			.chain_api(all_subsystems.chain_api)
			.collation_generation(all_subsystems.collation_generation)
			.collator_protocol(all_subsystems.collator_protocol)
			.approval_distribution(all_subsystems.approval_distribution)
			.approval_voting(all_subsystems.approval_voting)
			.gossip_support(all_subsystems.gossip_support)
Andronik Ordian's avatar
Andronik Ordian committed
667
668
669
670
			.dispute_coordinator(all_subsystems.dispute_coordinator)
			.dispute_participation(all_subsystems.dispute_participation)
			.dispute_distribution(all_subsystems.dispute_distribution)
			.chain_selection(all_subsystems.chain_selection)
671
			.leaves(Vec::from_iter(
Shawn Tabrizi's avatar
Shawn Tabrizi committed
672
673
674
				leaves
					.into_iter()
					.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
675
676
677
678
679
680
681
682
683
684
685
			))
			.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
			.active_leaves(Default::default())
			.span_per_active_leaf(Default::default())
			.activation_external_listeners(Default::default())
			.supports_parachains(supports_parachains)
			.metrics(metrics.clone())
			.spawner(s)
			.build()?;

		// spawn the metrics metronome task
686
		{
687
			struct ExtractNameAndMeters;
688

689
			impl<'a, T: 'a> MapSubsystem<&'a OverseenSubsystem<T>> for ExtractNameAndMeters {
690
				type Output = Option<(&'static str, SubsystemMeters)>;
691
692

				fn map_subsystem(&self, subsystem: &'a OverseenSubsystem<T>) -> Self::Output {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
693
694
695
696
					subsystem
						.instance
						.as_ref()
						.map(|instance| (instance.name, instance.meters.clone()))
697
698
				}
			}
699
			let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
700

701
702
703
			#[cfg(feature = "memory-stats")]
			let memory_stats = MemoryAllocationTracker::new().expect("Jemalloc is the default allocator. qed");

704
			let metronome_metrics = metrics.clone();
Shawn Tabrizi's avatar
Shawn Tabrizi committed
705
706
			let metronome =
				Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
					#[cfg(feature = "memory-stats")]
					match memory_stats.snapshot() {
						Ok(memory_stats_snapshot) => {
							tracing::trace!(
								target: LOG_TARGET,
								"memory_stats: {:?}",
								&memory_stats_snapshot
							);
							metronome_metrics.memory_stats_snapshot(memory_stats_snapshot);
						},

						Err(e) => tracing::debug!(
							target: LOG_TARGET,
							"Failed to obtain memory stats: {:?}",
							e
						),
					}

725
726
					// We combine the amount of messages from subsystems to the overseer
					// as well as the amount of messages from external sources to the overseer
727
728
					// into one `to_overseer` value.
					metronome_metrics.channel_fill_level_snapshot(
Shawn Tabrizi's avatar
Shawn Tabrizi committed
729
730
						subsystem_meters
							.iter()
731
732
							.cloned()
							.filter_map(|x| x)
Shawn Tabrizi's avatar
Shawn Tabrizi committed
733
							.map(|(name, ref meters)| (name, meters.read())),
734
					);
735

Shawn Tabrizi's avatar
Shawn Tabrizi committed
736
					async move { () }
737
				});
738
			overseer.spawner().spawn("metrics_metronome", Box::pin(metronome));
739
740
		}

Andronik Ordian's avatar
Andronik Ordian committed
741
		Ok((overseer, handle))
Fedor Sakharov's avatar
Fedor Sakharov committed
742
743
	}

744
	/// Stop the overseer.
Fedor Sakharov's avatar
Fedor Sakharov committed
745
	async fn stop(mut self) {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
746
		let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
Fedor Sakharov's avatar
Fedor Sakharov committed
747
748
749
750
	}

	/// Run the `Overseer`.
	pub async fn run(mut self) -> SubsystemResult<()> {
751
		// Notify about active leaves on startup before starting the loop
752
		for (hash, number) in std::mem::take(&mut self.leaves) {
753
			let _ = self.active_leaves.insert(hash, number);
754
			if let Some((span, status)) = self.on_head_activated(&hash, None) {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
755
756
				let update =
					ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span });
757
				self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
758
			}
759
760
		}

Fedor Sakharov's avatar
Fedor Sakharov committed
761
		loop {
762
			select! {
Andronik Ordian's avatar
Andronik Ordian committed
763
				msg = self.events_rx.select_next_some() => {
764
					match msg {
765
766
						Event::MsgToSubsystem { msg, origin } => {
							self.route_message(msg.into(), origin).await?;
767
							self.metrics.on_message_relayed();
768
769
770
771
772
773
774
775
776
777
778
779
780
781
						}
						Event::Stop => {
							self.stop().await;
							return Ok(());
						}
						Event::BlockImported(block) => {
							self.block_imported(block).await?;
						}
						Event::BlockFinalized(block) => {
							self.block_finalized(block).await?;
						}
						Event::ExternalRequest(request) => {
							self.handle_external_request(request);
						}
Fedor Sakharov's avatar
Fedor Sakharov committed
782
					}
783
				},
Andronik Ordian's avatar
Andronik Ordian committed
784
				msg = self.to_overseer_rx.select_next_some() => {
785
786
787
788
789
790
791
					match msg {
						ToOverseer::SpawnJob { name, s } => {
							self.spawn_job(name, s);
						}
						ToOverseer::SpawnBlockingJob { name, s } => {
							self.spawn_blocking_job(name, s);
						}
792
					}
793
				},
Andronik Ordian's avatar
Andronik Ordian committed
794
795
796
797
798
799
				res = self.running_subsystems.select_next_some() => {
					tracing::error!(
						target: LOG_TARGET,
						subsystem = ?res,
						"subsystem finished unexpectedly",
					);
800
					self.stop().await;
Andronik Ordian's avatar
Andronik Ordian committed
801
					return res;
802
				},
Fedor Sakharov's avatar
Fedor Sakharov committed
803
804
805
806
			}
		}
	}

807
	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
808
		match self.active_leaves.entry(block.hash) {
809
			hash_map::Entry::Vacant(entry) => entry.insert(block.number),
810
811
			hash_map::Entry::Occupied(entry) => {
				debug_assert_eq!(*entry.get(), block.number);
Shawn Tabrizi's avatar
Shawn Tabrizi committed
812
813
				return Ok(())
			},
814
815
		};

816
		let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)) {
817
			Some((span, status)) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
818
819
				hash: block.hash,
				number: block.number,
820
				status,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
821
				span,
822
823
824
			}),
			None => ActiveLeavesUpdate::default(),
		};
825
826
827
828
829

		if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
			debug_assert_eq!(block.number.saturating_sub(1), number);
			update.deactivated.push(block.parent_hash);
			self.on_head_deactivated(&block.parent_hash);
830
831
		}

832
833
		self.clean_up_external_listeners();

834
		if !update.is_empty() {
835
			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
836
		}
837
		Ok(())
838
839
840
	}

	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
841
		let mut update = ActiveLeavesUpdate::default();
842

843
		self.active_leaves.retain(|h, n| {
844
			if *n <= block.number {
845
				update.deactivated.push(*h);
846
847
848
849
850
851
				false
			} else {
				true
			}
		});

852
853
854
855
		for deactivated in &update.deactivated {
			self.on_head_deactivated(deactivated)
		}

Shawn Tabrizi's avatar
Shawn Tabrizi committed
856
857
		self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
			.await?;
858
859
860
861
862
863
864

		// If there are no leaves being deactivated, we don't need to send an update.
		//
		// Our peers will be informed about our finalized block the next time we activating/deactivating some leaf.
		if !update.is_empty() {
			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
		}
865

866
867
868
		Ok(())
	}

869
870
	/// Handles a header activation. If the header's state doesn't support the parachains API,
	/// this returns `None`.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
871
872
873
874
875
	fn on_head_activated(
		&mut self,
		hash: &Hash,
		parent_hash: Option<Hash>,
	) -> Option<(Arc<jaeger::Span>, LeafStatus)> {
876
		if !self.supports_parachains.head_supports_parachains(hash) {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
877
			return None
878
879
		}

880
881
882
883
		self.metrics.on_head_activated();
		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
			for listener in listeners {
				// it's fine if the listener is no longer interested
884
				let _ = listener.send(Ok(()));
885
886
			}
		}
887

888
		let mut span = jaeger::Span::new(*hash, "leaf-activated");
889
890
891
892
893
894

		if let Some(parent_span) = parent_hash.and_then(|h| self.span_per_active_leaf.get(&h)) {
			span.add_follows_from(&*parent_span);
		}

		let span = Arc::new(span);
895
		self.span_per_active_leaf.insert(*hash, span.clone());
896
897
898
899
900
901
902
903

		let status = if let Some(_) = self.known_leaves.put(*hash, ()) {
			LeafStatus::Stale
		} else {
			LeafStatus::Fresh
		};

		Some((span, status))
904
905
906
907
	}

	fn on_head_deactivated(&mut self, hash: &Hash) {
		self.metrics.on_head_deactivated();
908
909
		self.activation_external_listeners.remove(hash);
		self.span_per_active_leaf.remove(hash);
910
911
912
913
914
915
916
917
918
919
920
921
922
	}

	fn clean_up_external_listeners(&mut self) {
		self.activation_external_listeners.retain(|_, v| {
			// remove dead listeners
			v.retain(|c| !c.is_canceled());
			!v.is_empty()
		})
	}

	fn handle_external_request(&mut self, request: ExternalRequest) {
		match request {
			ExternalRequest::WaitForActivation { hash, response_channel } => {
923
924
925
926
927
				// We use known leaves here because the `WaitForActivation` message
				// is primarily concerned about leaves which subsystems have simply
				// not been made aware of yet. Anything in the known leaves set,
				// even if stale, has been activated in the past.
				if self.known_leaves.peek(&hash).is_some() {
928
					// it's fine if the listener is no longer interested
929
					let _ = response_channel.send(Ok(()));
930
				} else {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
931
932
933
934
					self.activation_external_listeners
						.entry(hash)
						.or_default()
						.push(response_channel);
935
				}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
936
			},
937
938
939
		}
	}

940
	fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
941
		self.spawner.spawn(name, j);
Fedor Sakharov's avatar
Fedor Sakharov committed
942
	}
943
944

	fn spawn_blocking_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
945
		self.spawner.spawn_blocking(name, j);
946
	}
Fedor Sakharov's avatar
Fedor Sakharov committed
947
}