lib.rs 24 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Utility module for subsystems
//!
//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
//! or determining what their validator ID is. These common interests are factored into
//! this module.
22
23
24
//!
//! This crate also reexports Prometheus metric types which are expected to be implemented by subsystems.

25
#![warn(missing_docs)]
26

27
use polkadot_node_subsystem::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
28
	errors::{RuntimeApiError, SubsystemError},
29
	messages::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
30
		AllMessages, BoundToRelayParent, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender,
31
	},
Shawn Tabrizi's avatar
Shawn Tabrizi committed
32
	overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext,
33
	SubsystemSender,
34
};
35
36

pub use overseer::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
37
38
	gen::{OverseerError, Timeout},
	Subsystem, TimeoutExt,
39
40
};

Shawn Tabrizi's avatar
Shawn Tabrizi committed
41
pub use polkadot_node_metrics::{metrics, Metronome};
42

Shawn Tabrizi's avatar
Shawn Tabrizi committed
43
44
45
46
47
48
use futures::{
	channel::{mpsc, oneshot},
	prelude::*,
	select,
	stream::{SelectAll, Stream},
};
49
use parity_scale_codec::Encode;
50
use pin_project::pin_project;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
51
use polkadot_node_jaeger as jaeger;
52
use polkadot_primitives::v1::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
53
54
55
	AuthorityDiscoveryId, CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs,
	GroupIndex, GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
	PersistedValidationData, SessionIndex, SessionInfo, Signed, SigningContext, ValidationCode,
56
	ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
57
58
};
use sp_application_crypto::AppKey;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
59
60
use sp_core::{traits::SpawnNamed, Public};
use sp_keystore::{CryptoStore, Error as KeystoreError, SyncCryptoStorePtr};
61
use std::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
62
63
64
65
66
67
68
69
	collections::{hash_map::Entry, HashMap},
	convert::TryFrom,
	fmt,
	marker::Unpin,
	pin::Pin,
	sync::Arc,
	task::{Context, Poll},
	time::Duration,
70
};
71
use thiserror::Error;
72

73
pub use metered_channel as metered;
74
pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
75

76
pub use determine_new_blocks::determine_new_blocks;
77

78
79
/// These reexports are required so that external crates can use the `delegated_subsystem` macro properly.
pub mod reexports {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
80
	pub use polkadot_overseer::gen::{SpawnNamed, SpawnedSubsystem, Subsystem, SubsystemContext};
81
}
82

83
84
/// A rolling session window cache.
pub mod rolling_session_window;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
85
86
/// Convenient and efficient runtime info access.
pub mod runtime;
87

88
mod determine_new_blocks;
Andronik Ordian's avatar
Andronik Ordian committed
89
90
91
92

#[cfg(test)]
mod tests;

93
94
95
96
97
98
/// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs
pub const JOB_CHANNEL_CAPACITY: usize = 64;

/// Utility errors
99
#[derive(Debug, Error)]
100
101
pub enum Error {
	/// Attempted to send or receive on a oneshot channel which had been canceled
102
103
	#[error(transparent)]
	Oneshot(#[from] oneshot::Canceled),
104
	/// Attempted to send on a MPSC channel which has been canceled
105
106
	#[error(transparent)]
	Mpsc(#[from] mpsc::SendError),
107
	/// A subsystem error
108
109
	#[error(transparent)]
	Subsystem(#[from] SubsystemError),
Andronik Ordian's avatar
Andronik Ordian committed
110
	/// An error in the Runtime API.
111
112
	#[error(transparent)]
	RuntimeApi(#[from] RuntimeApiError),
113
	/// The type system wants this even though it doesn't make sense
114
115
	#[error(transparent)]
	Infallible(#[from] std::convert::Infallible),
Denis_P's avatar
Denis_P committed
116
	/// Attempted to convert from an `AllMessages` to a `FromJob`, and failed.
117
	#[error("AllMessage not relevant to Job")]
118
119
	SenderConversion(String),
	/// The local node is not a validator.
120
	#[error("Node is not a validator")]
121
	NotAValidator,
122
	/// Already forwarding errors to another sender
123
	#[error("AlreadyForwarding")]
124
	AlreadyForwarding,
125
126
}

127
128
129
130
131
132
impl From<OverseerError> for Error {
	fn from(e: OverseerError) -> Self {
		Self::from(SubsystemError::from(e))
	}
}

133
134
135
/// A type alias for Runtime API receivers.
pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;

136
/// Request some data from the `RuntimeApi`.
137
pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
138
	parent: Hash,
139
	sender: &mut Sender,
140
	request_builder: RequestBuilder,
141
) -> RuntimeApiReceiver<Response>
142
where
143
	RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
144
	Sender: SubsystemSender,
145
146
147
{
	let (tx, rx) = oneshot::channel();

Shawn Tabrizi's avatar
Shawn Tabrizi committed
148
149
150
	sender
		.send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
		.await;
151

152
	rx
153
154
}

155
156
157
158
159
160
161
162
163
164
/// Construct specialized request functions for the runtime.
///
/// These would otherwise get pretty repetitive.
macro_rules! specialize_requests {
	// expand return type name for documentation purposes
	(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
		specialize_requests!{
			named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
		}
	};
165

166
167
168
169
170
	// create a single specialized request function
	(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
		#[doc = "Request `"]
		#[doc = $doc_name]
		#[doc = "` from the runtime"]
171
		pub async fn $func_name (
172
173
174
175
			parent: Hash,
			$(
				$param_name: $param_ty,
			)*
176
			sender: &mut impl SubsystemSender,
177
178
		) -> RuntimeApiReceiver<$return_ty>
		{
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
			request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
				$( $param_name, )* tx
			)).await
		}
	};

	// recursive decompose
	(
		fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
		$(
			fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
		)+
	) => {
		specialize_requests!{
			fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
		}
		specialize_requests!{
			$(
				fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
			)+
		}
	};
201
202
}

203
specialize_requests! {
204
	fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
205
206
207
	fn request_validators() -> Vec<ValidatorId>; Validators;
	fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
	fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
208
	fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
209
	fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
210
211
	fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
	fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
212
	fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
213
214
	fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
	fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
215
	fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
216
217
}

218
/// From the given set of validators, find the first key we can sign with, if any.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
219
220
221
222
pub async fn signing_key(
	validators: &[ValidatorId],
	keystore: &SyncCryptoStorePtr,
) -> Option<ValidatorId> {
223
	signing_key_and_index(validators, keystore).await.map(|(k, _)| k)
224
225
}

226
227
/// From the given set of validators, find the first key we can sign with, if any, and return it
/// along with the validator index.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
228
229
230
231
pub async fn signing_key_and_index(
	validators: &[ValidatorId],
	keystore: &SyncCryptoStorePtr,
) -> Option<(ValidatorId, ValidatorIndex)> {
232
233
	for (i, v) in validators.iter().enumerate() {
		if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), ValidatorId::ID)]).await {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
234
			return Some((v.clone(), ValidatorIndex(i as _)))
235
236
237
		}
	}
	None
238
239
}

240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
/// Sign the given data with the given validator ID.
///
/// Returns `Ok(None)` if the private key that correponds to that validator ID is not found in the
/// given keystore. Returns an error if the key could not be used for signing.
pub async fn sign(
	keystore: &SyncCryptoStorePtr,
	key: &ValidatorId,
	data: &[u8],
) -> Result<Option<ValidatorSignature>, KeystoreError> {
	use std::convert::TryInto;

	let signature =
		CryptoStore::sign_with(&**keystore, ValidatorId::ID, &key.into(), &data).await?;

	match signature {
		Some(sig) =>
			Ok(Some(sig.try_into().map_err(|_| KeystoreError::KeyNotSupported(ValidatorId::ID))?)),
		None => Ok(None),
	}
}

261
/// Find the validator group the given validator index belongs to.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
262
263
264
265
266
267
268
269
270
271
pub fn find_validator_group(
	groups: &[Vec<ValidatorIndex>],
	index: ValidatorIndex,
) -> Option<GroupIndex> {
	groups.iter().enumerate().find_map(|(i, g)| {
		if g.contains(&index) {
			Some(GroupIndex(i as _))
		} else {
			None
		}
272
273
274
	})
}

275
276
/// Choose a random subset of `min` elements.
/// But always include `is_priority` elements.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
277
278
279
280
281
pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(
	is_priority: F,
	mut v: Vec<T>,
	min: usize,
) -> Vec<T> {
282
	use rand::seq::SliceRandom as _;
283
284
285
286
287
288
289

	// partition the elements into priority first
	// the returned index is when non_priority elements start
	let i = itertools::partition(&mut v, is_priority);

	if i >= min || v.len() <= i {
		v.truncate(i);
Shawn Tabrizi's avatar
Shawn Tabrizi committed
290
		return v
291
292
	}

293
	let mut rng = rand::thread_rng();
294
	v[i..].shuffle(&mut rng);
295

296
	v.truncate(min);
297
298
299
	v
}

Denis_P's avatar
Denis_P committed
300
/// Returns a `bool` with a probability of `a / b` of being true.
301
pub fn gen_ratio(a: usize, b: usize) -> bool {
302
303
	use rand::Rng as _;
	let mut rng = rand::thread_rng();
304
	rng.gen_ratio(a as u32, b as u32)
305
306
}

307
308
309
310
/// Local validator information
///
/// It can be created if the local node is a validator in the context of a particular
/// relay chain block.
311
#[derive(Debug)]
312
313
pub struct Validator {
	signing_context: SigningContext,
314
	key: ValidatorId,
315
316
317
318
319
	index: ValidatorIndex,
}

impl Validator {
	/// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block.
320
	pub async fn new(
321
		parent: Hash,
322
		keystore: SyncCryptoStorePtr,
323
		sender: &mut impl SubsystemSender,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
324
	) -> Result<Self, Error> {
325
326
		// Note: request_validators and request_session_index_for_child do not and cannot
		// run concurrently: they both have a mutable handle to the same sender.
327
		// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
328
		let (validators, session_index) = futures::try_join!(
329
330
			request_validators(parent, sender).await,
			request_session_index_for_child(parent, sender).await,
331
332
		)?;

Shawn Tabrizi's avatar
Shawn Tabrizi committed
333
		let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
334
335
336

		let validators = validators?;

337
		Self::construct(&validators, signing_context, keystore).await
338
339
340
341
342
	}

	/// Construct a validator instance without performing runtime fetches.
	///
	/// This can be useful if external code also needs the same data.
343
	pub async fn construct(
344
345
		validators: &[ValidatorId],
		signing_context: SigningContext,
346
		keystore: SyncCryptoStorePtr,
347
	) -> Result<Self, Error> {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
348
349
		let (key, index) =
			signing_key_and_index(validators, &keystore).await.ok_or(Error::NotAValidator)?;
350

Shawn Tabrizi's avatar
Shawn Tabrizi committed
351
		Ok(Validator { signing_context, key, index })
352
353
354
355
	}

	/// Get this validator's id.
	pub fn id(&self) -> ValidatorId {
356
		self.key.clone()
357
358
359
360
361
362
363
364
365
366
367
368
369
	}

	/// Get this validator's local index.
	pub fn index(&self) -> ValidatorIndex {
		self.index
	}

	/// Get the current signing context.
	pub fn signing_context(&self) -> &SigningContext {
		&self.signing_context
	}

	/// Sign a payload with this validator
370
	pub async fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
371
		&self,
372
		keystore: SyncCryptoStorePtr,
373
		payload: Payload,
374
	) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
375
		Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key).await
376
377
378
	}
}

379
380
381
382
383
384
385
386
struct AbortOnDrop(future::AbortHandle);

impl Drop for AbortOnDrop {
	fn drop(&mut self) {
		self.0.abort();
	}
}

Denis_P's avatar
Denis_P committed
387
/// A `JobHandle` manages a particular job for a subsystem.
388
struct JobHandle<ToJob> {
389
	_abort_handle: AbortOnDrop,
390
391
392
393
394
	to_job: mpsc::Sender<ToJob>,
}

impl<ToJob> JobHandle<ToJob> {
	/// Send a message to the job.
395
	async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
396
397
398
399
		self.to_job.send(msg).await.map_err(Into::into)
	}
}

400
401
402
403
404
405
406
407
/// Commands from a job to the broader subsystem.
pub enum FromJobCommand {
	/// Spawn a child task on the executor.
	Spawn(&'static str, Pin<Box<dyn Future<Output = ()> + Send>>),
	/// Spawn a blocking child task on the executor's dedicated thread pool.
	SpawnBlocking(&'static str, Pin<Box<dyn Future<Output = ()> + Send>>),
}

408
/// A sender for messages from jobs, as well as commands to the overseer.
409
pub struct JobSender<S: SubsystemSender> {
410
411
412
413
	sender: S,
	from_job: mpsc::Sender<FromJobCommand>,
}

414
415
416
417
// A custom clone impl, since M does not need to impl `Clone`
// which `#[derive(Clone)]` requires.
impl<S: SubsystemSender> Clone for JobSender<S> {
	fn clone(&self) -> Self {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
418
		Self { sender: self.sender.clone(), from_job: self.from_job.clone() }
419
420
421
	}
}

422
423
424
425
426
427
428
impl<S: SubsystemSender> JobSender<S> {
	/// Get access to the underlying subsystem sender.
	pub fn subsystem_sender(&mut self) -> &mut S {
		&mut self.sender
	}

	/// Send a direct message to some other `Subsystem`, routed based on message type.
429
430
	pub async fn send_message(&mut self, msg: impl Into<AllMessages>) {
		self.sender.send_message(msg.into()).await
431
432
433
	}

	/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
434
435
	pub async fn send_messages<T, M>(&mut self, msgs: T)
	where
Shawn Tabrizi's avatar
Shawn Tabrizi committed
436
437
		T: IntoIterator<Item = M> + Send,
		T::IntoIter: Send,
438
		M: Into<AllMessages>,
439
	{
440
		self.sender.send_messages(msgs.into_iter().map(|m| m.into())).await
441
442
443
444
445
446
447
	}

	/// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message
	/// type.
	///
	/// This function should be used only when there is some other bounding factor on the messages
	/// sent with it. Otherwise, it risks a memory leak.
448
449
	pub fn send_unbounded_message(&mut self, msg: impl Into<AllMessages>) {
		self.sender.send_unbounded_message(msg.into())
450
451
452
453
454
455
456
457
458
	}

	/// Send a command to the subsystem, to be relayed onwards to the overseer.
	pub async fn send_command(&mut self, msg: FromJobCommand) -> Result<(), mpsc::SendError> {
		self.from_job.send(msg).await
	}
}

#[async_trait::async_trait]
459
460
461
462
463
464
465
impl<S, M> overseer::SubsystemSender<M> for JobSender<S>
where
	M: Send + 'static + Into<AllMessages>,
	S: SubsystemSender + Clone,
{
	async fn send_message(&mut self, msg: M) {
		self.sender.send_message(msg.into()).await
466
467
468
	}

	async fn send_messages<T>(&mut self, msgs: T)
Shawn Tabrizi's avatar
Shawn Tabrizi committed
469
470
471
	where
		T: IntoIterator<Item = M> + Send,
		T::IntoIter: Send,
472
	{
473
		self.sender.send_messages(msgs.into_iter().map(|m| m.into())).await
474
475
	}

476
477
	fn send_unbounded_message(&mut self, msg: M) {
		self.sender.send_unbounded_message(msg.into())
478
479
480
	}
}

481
482
483
484
485
486
487
488
489
impl fmt::Debug for FromJobCommand {
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
		match self {
			Self::Spawn(name, _) => write!(fmt, "FromJobCommand::Spawn({})", name),
			Self::SpawnBlocking(name, _) => write!(fmt, "FromJobCommand::SpawnBlocking({})", name),
		}
	}
}

490
491
492
/// This trait governs jobs.
///
/// Jobs are instantiated and killed automatically on appropriate overseer messages.
493
/// Other messages are passed along to and from the job via the overseer to other subsystems.
494
pub trait JobTrait: Unpin + Sized {
495
496
	/// Message type used to send messages to the job.
	type ToJob: 'static + BoundToRelayParent + Send;
497
	/// Job runtime error.
498
	type Error: 'static + std::error::Error + Send;
499
500
501
502
	/// Extra arguments this job needs to run properly.
	///
	/// If no extra information is needed, it is perfectly acceptable to set it to `()`.
	type RunArgs: 'static + Send;
503
504
505
506
507
508
	/// Subsystem-specific Prometheus metrics.
	///
	/// Jobs spawned by one subsystem should share the same
	/// instance of metrics (use `.clone()`).
	/// The `delegate_subsystem!` macro should take care of this.
	type Metrics: 'static + metrics::Metrics + Send;
509

510
	/// Name of the job, i.e. `candidate-backing-job`
511
512
	const NAME: &'static str;

513
514
515
	/// Run a job for the given relay `parent`.
	///
	/// The job should be ended when `receiver` returns `None`.
516
	fn run<S: SubsystemSender>(
517
		parent: Hash,
518
		span: Arc<jaeger::Span>,
519
		run_args: Self::RunArgs,
520
		metrics: Self::Metrics,
521
		receiver: mpsc::Receiver<Self::ToJob>,
522
		sender: JobSender<S>,
523
524
525
	) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
}

526
527
528
/// Error which can be returned by the jobs manager
///
/// Wraps the utility error type and the job-specific error
529
#[derive(Debug, Error)]
530
pub enum JobsError<JobError: std::fmt::Debug + std::error::Error + 'static> {
531
	/// utility error
532
533
	#[error("Utility")]
	Utility(#[source] Error),
534
	/// internal job error
535
536
	#[error("Internal")]
	Job(#[source] JobError),
537
538
}

539
540
541
542
543
544
/// Jobs manager for a subsystem
///
/// - Spawns new jobs for a given relay-parent on demand.
/// - Closes old jobs for a given relay-parent on demand.
/// - Dispatches messages to the appropriate job for a given relay-parent.
/// - When dropped, aborts all remaining jobs.
545
/// - implements `Stream<Item=FromJobCommand>`, collecting all messages from subordinate jobs.
546
#[pin_project]
547
struct Jobs<Spawner, ToJob> {
548
	spawner: Spawner,
549
	running: HashMap<Hash, JobHandle<ToJob>>,
550
	outgoing_msgs: SelectAll<mpsc::Receiver<FromJobCommand>>,
551
552
}

553
554
555
556
557
impl<Spawner, ToJob> Jobs<Spawner, ToJob>
where
	Spawner: SpawnNamed,
	ToJob: Send + 'static,
{
558
559
	/// Create a new Jobs manager which handles spawning appropriate jobs.
	pub fn new(spawner: Spawner) -> Self {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
560
		Self { spawner, running: HashMap::new(), outgoing_msgs: SelectAll::new() }
561
562
563
	}

	/// Spawn a new job for this `parent_hash`, with whatever args are appropriate.
564
	fn spawn_job<Job, Sender>(
565
566
		&mut self,
		parent_hash: Hash,
567
		span: Arc<jaeger::Span>,
568
569
		run_args: Job::RunArgs,
		metrics: Job::Metrics,
570
		sender: Sender,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
571
572
573
	) where
		Job: JobTrait<ToJob = ToJob>,
		Sender: SubsystemSender,
574
	{
575
576
577
578
		let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
		let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);

		let (future, abort_handle) = future::abortable(async move {
579
580
581
582
583
584
			if let Err(e) = Job::run(
				parent_hash,
				span,
				run_args,
				metrics,
				to_job_rx,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
585
586
587
588
				JobSender { sender, from_job: from_job_tx },
			)
			.await
			{
589
590
591
592
593
				tracing::error!(
					job = Job::NAME,
					parent_hash = %parent_hash,
					err = ?e,
					"job finished with an error",
594
				);
595

Shawn Tabrizi's avatar
Shawn Tabrizi committed
596
				return Err(e)
597
			}
598
599

			Ok(())
600
601
		});

602
603
604
605
606
		self.spawner.spawn(
			Job::NAME,
			Some(Job::NAME.strip_suffix("-job").unwrap_or(Job::NAME)),
			future.map(drop).boxed(),
		);
607
		self.outgoing_msgs.push(from_job_rx);
608

Shawn Tabrizi's avatar
Shawn Tabrizi committed
609
		let handle = JobHandle { _abort_handle: AbortOnDrop(abort_handle), to_job: to_job_tx };
610

611
		self.running.insert(parent_hash, handle);
612
613
614
	}

	/// Stop the job associated with this `parent_hash`.
615
	pub async fn stop_job(&mut self, parent_hash: Hash) {
616
		self.running.remove(&parent_hash);
617
618
619
	}

	/// Send a message to the appropriate job for this `parent_hash`.
620
	async fn send_msg(&mut self, parent_hash: Hash, msg: ToJob) {
621
622
623
624
		if let Entry::Occupied(mut job) = self.running.entry(parent_hash) {
			if job.get_mut().send_msg(msg).await.is_err() {
				job.remove();
			}
625
626
627
628
		}
	}
}

629
impl<Spawner, ToJob> Stream for Jobs<Spawner, ToJob>
630
where
631
	Spawner: SpawnNamed,
632
{
633
	type Item = FromJobCommand;
634

635
	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
636
637
638
639
		match futures::ready!(Pin::new(&mut self.outgoing_msgs).poll_next(cx)) {
			Some(msg) => Poll::Ready(Some(msg)),
			// Don't end if there are no jobs running
			None => Poll::Pending,
640
		}
641
642
643
	}
}

644
impl<Spawner, ToJob> stream::FusedStream for Jobs<Spawner, ToJob>
645
646
647
648
649
650
651
652
where
	Spawner: SpawnNamed,
{
	fn is_terminated(&self) -> bool {
		false
	}
}

653
/// Parameters to a job subsystem.
654
pub struct JobSubsystemParams<Spawner, RunArgs, Metrics> {
655
656
657
658
659
	/// A spawner for sub-tasks.
	spawner: Spawner,
	/// Arguments to each job.
	run_args: RunArgs,
	/// Metrics for the subsystem.
660
	pub metrics: Metrics,
661
}
662

663
/// A subsystem which wraps jobs.
664
///
665
666
667
/// Conceptually, this is very simple: it just loops forever.
///
/// - On incoming overseer messages, it starts or stops jobs as appropriate.
Andronik Ordian's avatar
Andronik Ordian committed
668
/// - On other incoming messages, if they can be converted into `Job::ToJob` and
669
670
671
///   include a hash, then they're forwarded to the appropriate individual job.
/// - On outgoing messages from the jobs, it forwards them to the overseer.
pub struct JobSubsystem<Job: JobTrait, Spawner> {
672
673
	#[allow(missing_docs)]
	pub params: JobSubsystemParams<Spawner, Job::RunArgs, Job::Metrics>,
674
	_marker: std::marker::PhantomData<Job>,
675
676
}

677
678
impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
	/// Create a new `JobSubsystem`.
679
	pub fn new(spawner: Spawner, run_args: Job::RunArgs, metrics: Job::Metrics) -> Self {
680
		JobSubsystem {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
681
			params: JobSubsystemParams { spawner, run_args, metrics },
682
			_marker: std::marker::PhantomData,
683
684
685
		}
	}

686
687
	/// Run the subsystem to completion.
	pub async fn run<Context>(self, mut ctx: Context)
Shawn Tabrizi's avatar
Shawn Tabrizi committed
688
689
690
691
692
693
694
695
696
	where
		Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
		Context: SubsystemContext<Message = <Job as JobTrait>::ToJob, Signal = OverseerSignal>,
		<Context as SubsystemContext>::Sender: SubsystemSender,
		Job: 'static + JobTrait + Send,
		<Job as JobTrait>::RunArgs: Clone + Sync,
		<Job as JobTrait>::ToJob:
			Sync + From<<Context as polkadot_overseer::SubsystemContext>::Message>,
		<Job as JobTrait>::Metrics: Sync,
697
	{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
698
		let JobSubsystem { params: JobSubsystemParams { spawner, run_args, metrics }, .. } = self;
699

700
		let mut jobs = Jobs::<Spawner, Job::ToJob>::new(spawner);
701
702
703

		loop {
			select! {
704
705
706
707
708
709
710
				incoming = ctx.recv().fuse() => {
					match incoming {
						Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
							activated,
							deactivated,
						}))) => {
							for activated in activated {
711
								let sender = ctx.sender().clone();
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
								jobs.spawn_job::<Job, _>(
									activated.hash,
									activated.span,
									run_args.clone(),
									metrics.clone(),
									sender,
								)
							}

							for hash in deactivated {
								jobs.stop_job(hash).await;
							}
						}
						Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => {
							jobs.running.clear();
							break;
						}
						Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(..))) => {}
						Ok(FromOverseer::Communication { msg }) => {
731
							if let Ok(to_job) = <<Context as SubsystemContext>::Message>::try_from(msg) {
732
733
734
735
736
737
738
739
740
741
742
								jobs.send_msg(to_job.relay_parent(), to_job).await;
							}
						}
						Err(err) => {
							tracing::error!(
								job = Job::NAME,
								err = ?err,
								"error receiving message from subsystem context for job",
							);
							break;
						}
743
744
					}
				}
745
				outgoing = jobs.next() => {
746
747
748
					// TODO verify the introduced .await here is not a problem
					// TODO it should only wait for the spawn to complete
					// TODO but not for anything beyond that
749
					let res = match outgoing.expect("the Jobs stream never ends; qed") {
750
						FromJobCommand::Spawn(name, task) => ctx.spawn(name, task),
751
						FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task),
752
					};
753

754
755
					if let Err(e) = res {
						tracing::warn!(err = ?e, "failed to handle command from job");
756
					}
757
				}
758
				complete => break,
759
760
761
762
763
			}
		}
	}
}

764
impl<Context, Job, Spawner> Subsystem<Context, SubsystemError> for JobSubsystem<Job, Spawner>
765
where
766
	Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
767
	Context: SubsystemContext<Message = Job::ToJob, Signal = OverseerSignal>,
768
	Job: 'static + JobTrait + Send,
769
	Job::RunArgs: Clone + Sync,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
770
771
	<Job as JobTrait>::ToJob:
		Sync + From<<Context as polkadot_overseer::SubsystemContext>::Message>,
772
	Job::Metrics: Sync,
773
774
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
775
		let future = Box::pin(async move {
776
			self.run(ctx).await;
777
			Ok(())
778
779
		});

780
		SpawnedSubsystem { name: Job::NAME.strip_suffix("-job").unwrap_or(Job::NAME), future }
781
782
	}
}