lib.rs 44.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Utility module for subsystems
//!
//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
//! or determining what their validator ID is. These common interests are factored into
//! this module.
22
23
24
//!
//! This crate also reexports Prometheus metric types which are expected to be implemented by subsystems.

25
26
27
28
#![deny(unused_results)]
// #![deny(unused_crate_dependencies] causes false positives
// https://github.com/rust-lang/rust/issues/57274
#![warn(missing_docs)]
29

30
use polkadot_node_subsystem::{
Andronik Ordian's avatar
Andronik Ordian committed
31
	errors::{ChainApiError, RuntimeApiError},
32
	messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
33
	FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
34
35
36
37
38
39
40
};
use futures::{
	channel::{mpsc, oneshot},
	future::Either,
	prelude::*,
	select,
	stream::Stream,
41
	task,
42
43
44
45
46
};
use futures_timer::Delay;
use parity_scale_codec::Encode;
use pin_project::{pin_project, pinned_drop};
use polkadot_primitives::v1::{
47
48
	CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
	GroupRotationInfo, Hash, Id as ParaId, ValidationData, OccupiedCoreAssumption,
49
	SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex,
50
};
51
52
53
54
55
56
57
58
59
60
use sp_core::{
	traits::SpawnNamed,
	Public
};
use sp_application_crypto::AppKey;
use sp_keystore::{
	CryptoStore,
	SyncCryptoStorePtr,
	Error as KeystoreError,
};
61
62
63
64
65
use std::{
	collections::HashMap,
	convert::{TryFrom, TryInto},
	marker::Unpin,
	pin::Pin,
66
	task::{Poll, Context},
67
68
69
	time::Duration,
};
use streamunordered::{StreamUnordered, StreamYield};
70
use thiserror::Error;
71

72
73
pub mod validator_discovery;

74
75
76
77
78
79
80
81
82
/// These reexports are required so that external crates can use the `delegated_subsystem` macro properly.
pub mod reexports {
	pub use sp_core::traits::SpawnNamed;
	pub use polkadot_node_subsystem::{
		SpawnedSubsystem,
		Subsystem,
		SubsystemContext,
	};
}
83

84

85
86
87
88
89
90
/// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs
pub const JOB_CHANNEL_CAPACITY: usize = 64;

/// Utility errors
91
#[derive(Debug, Error)]
92
93
pub enum Error {
	/// Attempted to send or receive on a oneshot channel which had been canceled
94
95
	#[error(transparent)]
	Oneshot(#[from] oneshot::Canceled),
96
	/// Attempted to send on a MPSC channel which has been canceled
97
98
	#[error(transparent)]
	Mpsc(#[from] mpsc::SendError),
99
	/// A subsystem error
100
101
	#[error(transparent)]
	Subsystem(#[from] SubsystemError),
Andronik Ordian's avatar
Andronik Ordian committed
102
	/// An error in the Chain API.
103
104
	#[error(transparent)]
	ChainApi(#[from] ChainApiError),
Andronik Ordian's avatar
Andronik Ordian committed
105
	/// An error in the Runtime API.
106
107
	#[error(transparent)]
	RuntimeApi(#[from] RuntimeApiError),
108
	/// The type system wants this even though it doesn't make sense
109
110
	#[error(transparent)]
	Infallible(#[from] std::convert::Infallible),
111
	/// Attempted to convert from an AllMessages to a FromJob, and failed.
112
	#[error("AllMessage not relevant to Job")]
113
114
	SenderConversion(String),
	/// The local node is not a validator.
115
	#[error("Node is not a validator")]
116
117
	NotAValidator,
	/// The desired job is not present in the jobs list.
118
	#[error("Relay parent {0} not of interest")]
119
	JobNotFound(Hash),
120
	/// Already forwarding errors to another sender
121
	#[error("AlreadyForwarding")]
122
	AlreadyForwarding,
123
124
}

125
126
127
/// A type alias for Runtime API receivers.
pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;

128
129
130
131
132
/// Request some data from the `RuntimeApi`.
pub async fn request_from_runtime<RequestBuilder, Response, FromJob>(
	parent: Hash,
	sender: &mut mpsc::Sender<FromJob>,
	request_builder: RequestBuilder,
133
) -> Result<RuntimeApiReceiver<Response>, Error>
134
where
135
	RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
	FromJob: TryFrom<AllMessages>,
	<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
	let (tx, rx) = oneshot::channel();

	sender
		.send(
			AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
				.try_into()
				.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
		)
		.await?;

	Ok(rx)
}

152
153
154
155
156
157
158
159
160
161
/// Construct specialized request functions for the runtime.
///
/// These would otherwise get pretty repetitive.
macro_rules! specialize_requests {
	// expand return type name for documentation purposes
	(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
		specialize_requests!{
			named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
		}
	};
162

163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
	// create a single specialized request function
	(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
		#[doc = "Request `"]
		#[doc = $doc_name]
		#[doc = "` from the runtime"]
		pub async fn $func_name<FromJob>(
			parent: Hash,
			$(
				$param_name: $param_ty,
			)*
			sender: &mut mpsc::Sender<FromJob>,
		) -> Result<RuntimeApiReceiver<$return_ty>, Error>
		where
			FromJob: TryFrom<AllMessages>,
			<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
		{
			request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
				$( $param_name, )* tx
			)).await
		}
	};

	// recursive decompose
	(
		fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
		$(
			fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
		)+
	) => {
		specialize_requests!{
			fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
		}
		specialize_requests!{
			$(
				fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
			)+
		}
	};
201
202
}

203
204
205
206
specialize_requests! {
	fn request_validators() -> Vec<ValidatorId>; Validators;
	fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
	fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
207
208
	fn request_full_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationData>; FullValidationData;
	fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
209
210
211
212
	fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
	fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
	fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
	fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
213
214
}

215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
/// Request some data from the `RuntimeApi` via a SubsystemContext.
async fn request_from_runtime_ctx<RequestBuilder, Context, Response>(
	parent: Hash,
	ctx: &mut Context,
	request_builder: RequestBuilder,
) -> Result<RuntimeApiReceiver<Response>, Error>
where
	RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
	Context: SubsystemContext,
{
	let (tx, rx) = oneshot::channel();

	ctx
		.send_message(
			AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
				.try_into()
				.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
		)
		.await?;

	Ok(rx)
}


/// Construct specialized request functions for the runtime.
///
/// These would otherwise get pretty repetitive.
macro_rules! specialize_requests_ctx {
	// expand return type name for documentation purposes
	(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
		specialize_requests_ctx!{
			named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
		}
	};

	// create a single specialized request function
	(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
		#[doc = "Request `"]
		#[doc = $doc_name]
		#[doc = "` from the runtime via a `SubsystemContext`"]
		pub async fn $func_name<Context: SubsystemContext>(
			parent: Hash,
			$(
				$param_name: $param_ty,
			)*
			ctx: &mut Context,
		) -> Result<RuntimeApiReceiver<$return_ty>, Error> {
			request_from_runtime_ctx(parent, ctx, |tx| RuntimeApiRequest::$request_variant(
				$( $param_name, )* tx
			)).await
		}
	};

	// recursive decompose
	(
		fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
		$(
			fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
		)+
	) => {
		specialize_requests_ctx!{
			fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
		}
		specialize_requests_ctx!{
			$(
				fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
			)+
		}
	};
}

specialize_requests_ctx! {
	fn request_validators_ctx() -> Vec<ValidatorId>; Validators;
	fn request_validator_groups_ctx() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
	fn request_availability_cores_ctx() -> Vec<CoreState>; AvailabilityCores;
290
291
	fn request_full_validation_data_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationData>; FullValidationData;
	fn request_persisted_validation_data_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
292
293
294
295
296
297
	fn request_session_index_for_child_ctx() -> SessionIndex; SessionIndexForChild;
	fn request_validation_code_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
	fn request_candidate_pending_availability_ctx(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
	fn request_candidate_events_ctx() -> Vec<CandidateEvent>; CandidateEvents;
}

298
/// From the given set of validators, find the first key we can sign with, if any.
299
300
301
302
303
304
305
pub async fn signing_key(validators: &[ValidatorId], keystore: SyncCryptoStorePtr) -> Option<ValidatorId> {
	for v in validators.iter() {
		if CryptoStore::has_keys(&*keystore, &[(v.to_raw_vec(), ValidatorId::ID)]).await {
			return Some(v.clone());
		}
	}
	None
306
307
308
309
310
311
312
313
}

/// Local validator information
///
/// It can be created if the local node is a validator in the context of a particular
/// relay chain block.
pub struct Validator {
	signing_context: SigningContext,
314
	key: ValidatorId,
315
316
317
318
319
320
321
	index: ValidatorIndex,
}

impl Validator {
	/// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block.
	pub async fn new<FromJob>(
		parent: Hash,
322
		keystore: SyncCryptoStorePtr,
323
324
325
326
327
328
		mut sender: mpsc::Sender<FromJob>,
	) -> Result<Self, Error>
	where
		FromJob: TryFrom<AllMessages>,
		<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
	{
329
330
		// Note: request_validators and request_session_index_for_child do not and cannot
		// run concurrently: they both have a mutable handle to the same sender.
331
		// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
332
		let (validators, session_index) = futures::try_join!(
333
			request_validators(parent, &mut sender).await?,
334
			request_session_index_for_child(parent, &mut sender).await?,
335
336
		)?;

337
338
339
340
341
342
343
		let signing_context = SigningContext {
			session_index: session_index?,
			parent_hash: parent,
		};

		let validators = validators?;

344
		Self::construct(&validators, signing_context, keystore).await
345
346
347
348
349
	}

	/// Construct a validator instance without performing runtime fetches.
	///
	/// This can be useful if external code also needs the same data.
350
	pub async fn construct(
351
352
		validators: &[ValidatorId],
		signing_context: SigningContext,
353
		keystore: SyncCryptoStorePtr,
354
	) -> Result<Self, Error> {
355
		let key = signing_key(validators, keystore).await.ok_or(Error::NotAValidator)?;
356
357
358
		let index = validators
			.iter()
			.enumerate()
359
			.find(|(_, k)| k == &&key)
360
361
362
363
364
365
366
367
368
369
370
371
			.map(|(idx, _)| idx as ValidatorIndex)
			.expect("signing_key would have already returned NotAValidator if the item we're searching for isn't in this list; qed");

		Ok(Validator {
			signing_context,
			key,
			index,
		})
	}

	/// Get this validator's id.
	pub fn id(&self) -> ValidatorId {
372
		self.key.clone()
373
374
375
376
377
378
379
380
381
382
383
384
385
	}

	/// Get this validator's local index.
	pub fn index(&self) -> ValidatorIndex {
		self.index
	}

	/// Get the current signing context.
	pub fn signing_context(&self) -> &SigningContext {
		&self.signing_context
	}

	/// Sign a payload with this validator
386
	pub async fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
387
		&self,
388
		keystore: SyncCryptoStorePtr,
389
		payload: Payload,
390
391
	) -> Result<Signed<Payload, RealPayload>, KeystoreError> {
		Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key).await
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
	}

	/// Validate the payload with this validator
	///
	/// Validation can only succeed if `signed.validator_index() == self.index()`.
	/// Normally, this will always be the case for a properly operating program,
	/// but it's double-checked here anyway.
	pub fn check_payload<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
		&self,
		signed: Signed<Payload, RealPayload>,
	) -> Result<(), ()> {
		if signed.validator_index() != self.index {
			return Err(());
		}
		signed.check_signature(&self.signing_context, &self.id())
	}
}

/// ToJob is expected to be an enum declaring the set of messages of interest to a particular job.
///
/// Normally, this will be some subset of `Allmessages`, and a `Stop` variant.
pub trait ToJobTrait: TryFrom<AllMessages> {
	/// The `Stop` variant of the ToJob enum.
	const STOP: Self;

	/// If the message variant contains its relay parent, return it here
	fn relay_parent(&self) -> Option<Hash>;
}

/// A JobHandle manages a particular job for a subsystem.
422
struct JobHandle<ToJob> {
423
424
425
426
427
428
429
430
	abort_handle: future::AbortHandle,
	to_job: mpsc::Sender<ToJob>,
	finished: oneshot::Receiver<()>,
	outgoing_msgs_handle: usize,
}

impl<ToJob> JobHandle<ToJob> {
	/// Send a message to the job.
431
	async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
432
433
434
435
436
437
438
439
		self.to_job.send(msg).await.map_err(Into::into)
	}
}

impl<ToJob: ToJobTrait> JobHandle<ToJob> {
	/// Stop this job gracefully.
	///
	/// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it.
440
	async fn stop(mut self) {
441
		// we don't actually care if the message couldn't be sent
442
443
444
445
446
447
		if let Err(_) = self.to_job.send(ToJob::STOP).await {
			// no need to wait further here: the job is either stalled or
			// disconnected, and in either case, we can just abort it immediately
			self.abort_handle.abort();
			return;
		}
448
449
450
451
452
453
454
455
456
457
458
		let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION);

		match future::select(stop_timer, self.finished).await {
			Either::Left((_, _)) => {}
			Either::Right((_, _)) => {
				self.abort_handle.abort();
			}
		}
	}
}

459
460
461
462
463
464
465
466
467
468
469
470
471
472
/// This module reexports Prometheus types and defines the [`Metrics`] trait.
pub mod metrics {
	/// Reexport Prometheus types.
	pub use substrate_prometheus_endpoint as prometheus;

	/// Subsystem- or job-specific Prometheus metrics.
	///
	/// Usually implemented as a wrapper for `Option<ActualMetrics>`
	/// to ensure `Default` bounds or as a dummy type ().
	/// Prometheus metrics internally hold an `Arc` reference, so cloning them is fine.
	pub trait Metrics: Default + Clone {
		/// Try to register metrics in the Prometheus registry.
		fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError>;

473
474
475
476
477
478
479
480
481
		/// Convenience method to register metrics in the optional Promethius registry.
		///
		/// If no registry is provided, returns `Default::default()`. Otherwise, returns the same
		/// thing that `try_register` does.
		fn register(registry: Option<&prometheus::Registry>) -> Result<Self, prometheus::PrometheusError> {
			match registry {
				None => Ok(Self::default()),
				Some(registry) => Self::try_register(registry),
			}
482
483
484
485
486
487
488
489
490
491
492
		}
	}

	// dummy impl
	impl Metrics for () {
		fn try_register(_registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
			Ok(())
		}
	}
}

493
494
495
496
497
498
499
500
501
502
503
/// This trait governs jobs.
///
/// Jobs are instantiated and killed automatically on appropriate overseer messages.
/// Other messages are passed along to and from the job via the overseer to other
/// subsystems.
pub trait JobTrait: Unpin {
	/// Message type to the job. Typically a subset of AllMessages.
	type ToJob: 'static + ToJobTrait + Send;
	/// Message type from the job. Typically a subset of AllMessages.
	type FromJob: 'static + Into<AllMessages> + Send;
	/// Job runtime error.
504
	type Error: 'static + std::error::Error + Send;
505
506
507
508
	/// Extra arguments this job needs to run properly.
	///
	/// If no extra information is needed, it is perfectly acceptable to set it to `()`.
	type RunArgs: 'static + Send;
509
510
511
512
513
514
	/// Subsystem-specific Prometheus metrics.
	///
	/// Jobs spawned by one subsystem should share the same
	/// instance of metrics (use `.clone()`).
	/// The `delegate_subsystem!` macro should take care of this.
	type Metrics: 'static + metrics::Metrics + Send;
515
516
517
518
519
520
521
522

	/// Name of the job, i.e. `CandidateBackingJob`
	const NAME: &'static str;

	/// Run a job for the parent block indicated
	fn run(
		parent: Hash,
		run_args: Self::RunArgs,
523
		metrics: Self::Metrics,
524
525
		receiver: mpsc::Receiver<Self::ToJob>,
		sender: mpsc::Sender<Self::FromJob>,
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
	) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;

	/// Handle a message which has no relay parent, and therefore can't be dispatched to a particular job
	///
	/// By default, this is implemented with a NOP function. However, if
	/// ToJob occasionally has messages which do not correspond to a particular
	/// parent relay hash, then this function will be spawned as a one-off
	/// task to handle those messages.
	// TODO: the API here is likely not precisely what we want; figure it out more
	// once we're implementing a subsystem which actually needs this feature.
	// In particular, we're quite likely to want this to return a future instead of
	// interrupting the active thread for the duration of the handler.
	fn handle_unanchored_msg(_msg: Self::ToJob) -> Result<(), Self::Error> {
		Ok(())
	}
}

543
544
545
/// Error which can be returned by the jobs manager
///
/// Wraps the utility error type and the job-specific error
546
547
#[derive(Debug, Error)]
pub enum JobsError<JobError: 'static + std::error::Error> {
548
	/// utility error
549
550
	#[error("Utility")]
	Utility(#[source] Error),
551
	/// internal job error
552
553
	#[error("Internal")]
	Job(#[source] JobError),
554
555
}

556
557
558
559
560
561
562
563
564
565
566
567
568
569
/// Jobs manager for a subsystem
///
/// - Spawns new jobs for a given relay-parent on demand.
/// - Closes old jobs for a given relay-parent on demand.
/// - Dispatches messages to the appropriate job for a given relay-parent.
/// - When dropped, aborts all remaining jobs.
/// - implements `Stream<Item=Job::FromJob>`, collecting all messages from subordinate jobs.
#[pin_project(PinnedDrop)]
pub struct Jobs<Spawner, Job: JobTrait> {
	spawner: Spawner,
	running: HashMap<Hash, JobHandle<Job::ToJob>>,
	#[pin]
	outgoing_msgs: StreamUnordered<mpsc::Receiver<Job::FromJob>>,
	job: std::marker::PhantomData<Job>,
570
	errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
571
572
}

573
impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
574
575
576
577
578
579
580
	/// Create a new Jobs manager which handles spawning appropriate jobs.
	pub fn new(spawner: Spawner) -> Self {
		Self {
			spawner,
			running: HashMap::new(),
			outgoing_msgs: StreamUnordered::new(),
			job: std::marker::PhantomData,
581
			errors: None,
582
583
584
		}
	}

585
586
587
588
589
590
	/// Monitor errors which may occur during handling of a spawned job.
	///
	/// By default, an error in a job is simply logged. Once this is called,
	/// the error is forwarded onto the provided channel.
	///
	/// Errors if the error channel already exists.
591
592
593
594
595
596
597
	pub fn forward_errors(
		&mut self,
		tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>,
	) -> Result<(), Error> {
		if self.errors.is_some() {
			return Err(Error::AlreadyForwarding);
		}
598
599
600
601
		self.errors = Some(tx);
		Ok(())
	}

602
	/// Spawn a new job for this `parent_hash`, with whatever args are appropriate.
603
	fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs, metrics: Job::Metrics) -> Result<(), Error> {
604
605
606
607
		let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
		let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
		let (finished_tx, finished) = oneshot::channel();

608
609
610
		// clone the error transmitter to move into the future
		let err_tx = self.errors.clone();

611
		let (future, abort_handle) = future::abortable(async move {
612
			if let Err(e) = Job::run(parent_hash, run_args, metrics, to_job_rx, from_job_tx).await {
613
614
615
616
617
618
				log::error!(
					"{}({}) finished with an error {:?}",
					Job::NAME,
					parent_hash,
					e,
				);
619
620
621
622
623
624
625
626
627

				if let Some(mut err_tx) = err_tx {
					// if we can't send the notification of error on the error channel, then
					// there's no point trying to propagate this error onto the channel too
					// all we can do is warn that error propagatio has failed
					if let Err(e) = err_tx.send((Some(parent_hash), JobsError::Job(e))).await {
						log::warn!("failed to forward error: {:?}", e);
					}
				}
628
629
630
			}
		});

631
		// the spawn mechanism requires that the spawned future has no output
632
		let future = async move {
633
634
635
			// job errors are already handled within the future, meaning
			// that any errors here are due to the abortable mechanism.
			// failure to abort isn't of interest.
636
			let _ = future.await;
637
638
			// transmission failure here is only possible if the receiver is closed,
			// which means the handle is dropped, which means we don't care anymore
639
640
			let _ = finished_tx.send(());
		};
641
		self.spawner.spawn(Job::NAME, future.boxed());
642
643
644
645
646
647
648
649
650
651
652
653

		// this handle lets us remove the appropriate receiver from self.outgoing_msgs
		// when it's time to stop the job.
		let outgoing_msgs_handle = self.outgoing_msgs.push(from_job_rx);

		let handle = JobHandle {
			abort_handle,
			to_job: to_job_tx,
			finished,
			outgoing_msgs_handle,
		};

654
		let _ = self.running.insert(parent_hash, handle);
655
656
657
658
659
660
661
662

		Ok(())
	}

	/// Stop the job associated with this `parent_hash`.
	pub async fn stop_job(&mut self, parent_hash: Hash) -> Result<(), Error> {
		match self.running.remove(&parent_hash) {
			Some(handle) => {
663
				let _ = Pin::new(&mut self.outgoing_msgs).remove(handle.outgoing_msgs_handle);
664
665
666
667
668
669
670
671
				handle.stop().await;
				Ok(())
			}
			None => Err(Error::JobNotFound(parent_hash)),
		}
	}

	/// Send a message to the appropriate job for this `parent_hash`.
672
	/// Will not return an error if the job is not running.
673
674
675
	async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) -> Result<(), Error> {
		match self.running.get_mut(&parent_hash) {
			Some(job) => job.send_msg(msg).await?,
676
677
678
			None => {
				// don't bring down the subsystem, this can happen to due a race condition
			},
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
		}
		Ok(())
	}
}

// Note that on drop, we don't have the chance to gracefully spin down each of the remaining handles;
// we just abort them all. Still better than letting them dangle.
#[pinned_drop]
impl<Spawner, Job: JobTrait> PinnedDrop for Jobs<Spawner, Job> {
	fn drop(self: Pin<&mut Self>) {
		for job_handle in self.running.values() {
			job_handle.abort_handle.abort();
		}
	}
}

impl<Spawner, Job> Stream for Jobs<Spawner, Job>
where
697
	Spawner: SpawnNamed,
698
699
700
701
702
703
	Job: JobTrait,
{
	type Item = Job::FromJob;

	fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
		// pin-project the outgoing messages
704
		let result = self.project().outgoing_msgs.poll_next(cx).map(|opt| {
705
			opt.and_then(|(stream_yield, _)| match stream_yield {
706
707
				StreamYield::Item(msg) => Some(msg),
				StreamYield::Finished(_) => None,
708
			})
709
710
711
712
713
714
		});
		// we don't want the stream to end if the jobs are empty at some point
		match result {
			task::Poll::Ready(None) => task::Poll::Pending,
			otherwise => otherwise,
		}
715
716
717
718
719
720
721
722
723
724
725
726
	}
}

/// A basic implementation of a subsystem.
///
/// This struct is responsible for handling message traffic between
/// this subsystem and the overseer. It spawns and kills jobs on the
/// appropriate Overseer messages, and dispatches standard traffic to
/// the appropriate job the rest of the time.
pub struct JobManager<Spawner, Context, Job: JobTrait> {
	spawner: Spawner,
	run_args: Job::RunArgs,
727
	metrics: Job::Metrics,
728
729
	context: std::marker::PhantomData<Context>,
	job: std::marker::PhantomData<Job>,
730
	errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
731
732
733
734
}

impl<Spawner, Context, Job> JobManager<Spawner, Context, Job>
where
735
	Spawner: SpawnNamed + Clone + Send + Unpin,
736
	Context: SubsystemContext,
737
	Job: 'static + JobTrait,
738
739
740
741
	Job::RunArgs: Clone,
	Job::ToJob: TryFrom<AllMessages> + TryFrom<<Context as SubsystemContext>::Message> + Sync,
{
	/// Creates a new `Subsystem`.
742
	pub fn new(spawner: Spawner, run_args: Job::RunArgs, metrics: Job::Metrics) -> Self {
743
744
745
		Self {
			spawner,
			run_args,
746
			metrics,
747
748
			context: std::marker::PhantomData,
			job: std::marker::PhantomData,
749
			errors: None,
750
751
752
		}
	}

753
754
755
756
757
758
	/// Monitor errors which may occur during handling of a spawned job.
	///
	/// By default, an error in a job is simply logged. Once this is called,
	/// the error is forwarded onto the provided channel.
	///
	/// Errors if the error channel already exists.
759
760
761
762
763
764
765
	pub fn forward_errors(
		&mut self,
		tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>,
	) -> Result<(), Error> {
		if self.errors.is_some() {
			return Err(Error::AlreadyForwarding);
		}
766
767
768
769
		self.errors = Some(tx);
		Ok(())
	}

770
771
772
773
774
775
776
777
	/// Run this subsystem
	///
	/// Conceptually, this is very simple: it just loops forever.
	///
	/// - On incoming overseer messages, it starts or stops jobs as appropriate.
	/// - On other incoming messages, if they can be converted into Job::ToJob and
	///   include a hash, then they're forwarded to the appropriate individual job.
	/// - On outgoing messages from the jobs, it forwards them to the overseer.
778
779
780
	///
	/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
	/// Otherwise, most are logged and then discarded.
781
782
783
	pub async fn run(
		mut ctx: Context,
		run_args: Job::RunArgs,
784
		metrics: Job::Metrics,
785
786
787
		spawner: Spawner,
		mut err_tx: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
	) {
788
		let mut jobs = Jobs::new(spawner.clone());
789
		if let Some(ref err_tx) = err_tx {
790
791
			jobs.forward_errors(err_tx.clone())
				.expect("we never call this twice in this context; qed");
792
		}
793
794
795

		loop {
			select! {
796
				incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &metrics, &mut err_tx).await { break },
797
				outgoing = jobs.next().fuse() => Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await,
798
799
800
801
802
				complete => break,
			}
		}
	}

803
	// if we have a channel on which to forward errors, do so
804
805
806
807
808
	async fn fwd_err(
		hash: Option<Hash>,
		err: JobsError<Job::Error>,
		err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
	) {
809
810
811
812
813
814
815
816
817
		if let Some(err_tx) = err_tx {
			// if we can't send on the error transmission channel, we can't do anything useful about it
			// still, we can at least log the failure
			if let Err(e) = err_tx.send((hash, err)).await {
				log::warn!("failed to forward error: {:?}", e);
			}
		}
	}

818
819
820
821
822
	// handle an incoming message. return true if we should break afterwards.
	async fn handle_incoming(
		incoming: SubsystemResult<FromOverseer<Context::Message>>,
		jobs: &mut Jobs<Spawner, Job>,
		run_args: &Job::RunArgs,
823
		metrics: &Job::Metrics,
824
		err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
825
	) -> bool {
826
827
828
		use polkadot_node_subsystem::ActiveLeavesUpdate;
		use polkadot_node_subsystem::FromOverseer::{Communication, Signal};
		use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};
829
830

		match incoming {
831
832
833
834
			Ok(Signal(ActiveLeaves(ActiveLeavesUpdate {
				activated,
				deactivated,
			}))) => {
835
				for hash in activated {
836
837
					let metrics = metrics.clone();
					if let Err(e) = jobs.spawn_job(hash, run_args.clone(), metrics) {
838
						log::error!("Failed to spawn a job: {:?}", e);
839
840
						let e = JobsError::Utility(e);
						Self::fwd_err(Some(hash), e, err_tx).await;
841
842
						return true;
					}
843
				}
844
845
846
847

				for hash in deactivated {
					if let Err(e) = jobs.stop_job(hash).await {
						log::error!("Failed to stop a job: {:?}", e);
848
849
						let e = JobsError::Utility(e);
						Self::fwd_err(Some(hash), e, err_tx).await;
850
851
						return true;
					}
852
853
854
855
856
857
858
859
				}
			}
			Ok(Signal(Conclude)) => {
				// Breaking the loop ends fn run, which drops `jobs`, which immediately drops all ongoing work.
				// We can afford to wait a little while to shut them all down properly before doing that.
				//
				// Forwarding the stream to a drain means we wait until all of the items in the stream
				// have completed. Contrast with `into_future`, which turns it into a future of `(head, rest_stream)`.
860
				use futures::sink::drain;
861
				use futures::stream::FuturesUnordered;
862
				use futures::stream::StreamExt;
863

864
865
				if let Err(e) = jobs
					.running
866
867
					.drain()
					.map(|(_, handle)| handle.stop())
868
869
870
871
872
873
					.collect::<FuturesUnordered<_>>()
					.map(Ok)
					.forward(drain())
					.await
				{
					log::error!("failed to stop all jobs on conclude signal: {:?}", e);
874
875
					let e = Error::from(e);
					Self::fwd_err(None, JobsError::Utility(e), err_tx).await;
876
				}
877
878
879
880
881
882
883
884
885

				return true;
			}
			Ok(Communication { msg }) => {
				if let Ok(to_job) = <Job::ToJob>::try_from(msg) {
					match to_job.relay_parent() {
						Some(hash) => {
							if let Err(err) = jobs.send_msg(hash, to_job).await {
								log::error!("Failed to send a message to a job: {:?}", err);
886
887
								let e = JobsError::Utility(err);
								Self::fwd_err(Some(hash), e, err_tx).await;
888
889
890
891
892
893
								return true;
							}
						}
						None => {
							if let Err(err) = Job::handle_unanchored_msg(to_job) {
								log::error!("Failed to handle unhashed message: {:?}", err);
894
895
								let e = JobsError::Job(err);
								Self::fwd_err(None, e, err_tx).await;
896
897
898
899
900
901
								return true;
							}
						}
					}
				}
			}
902
			Ok(Signal(BlockFinalized(_))) => {}
903
904
			Err(err) => {
				log::error!("error receiving message from subsystem context: {:?}", err);
905
906
				let e = JobsError::Utility(Error::from(err));
				Self::fwd_err(None, e, err_tx).await;
907
908
909
910
911
912
				return true;
			}
		}
		false
	}

913
	// handle an outgoing message.
914
915
916
917
	async fn handle_outgoing(
		outgoing: Option<Job::FromJob>,
		ctx: &mut Context,
		err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
918
919
920
	) {
		let msg = outgoing.expect("the Jobs stream never ends; qed");
		if let Err(e) = ctx.send_message(msg.into()).await {
921
922
			let e = JobsError::Utility(e.into());
			Self::fwd_err(None, e, err_tx).await;
923
924
925
926
927
928
		}
	}
}

impl<Spawner, Context, Job> Subsystem<Context> for JobManager<Spawner, Context, Job>
where
929
	Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
930
931
	Context: SubsystemContext,
	<Context as SubsystemContext>::Message: Into<Job::ToJob>,
932
	Job: 'static + JobTrait + Send,
933
934
	Job::RunArgs: Clone + Sync,
	Job::ToJob: TryFrom<AllMessages> + Sync,
935
	Job::Metrics: Sync,
936
937
938
939
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
		let spawner = self.spawner.clone();
		let run_args = self.run_args.clone();
940
		let metrics = self.metrics.clone();
941
942
		let errors = self.errors;

943
		let future = Box::pin(async move {
944
			Self::run(ctx, run_args, metrics, spawner, errors).await;
945
			Ok(())
946
947
948
		});

		SpawnedSubsystem {
949
			name: Job::NAME.strip_suffix("Job").unwrap_or(Job::NAME),
950
951
			future,
		}
952
953
	}
}
954

955
956
957
958
959
/// Create a delegated subsystem
///
/// It is possible to create a type which implements `Subsystem` by simply doing:
///
/// ```ignore
960
/// pub type ExampleSubsystem<Spawner, Context> = JobManager<Spawner, Context, ExampleJob>;
961
962
963
964
965
966
967
/// ```
///
/// However, doing this requires that job itself and all types which comprise it (i.e. `ToJob`, `FromJob`, `Error`, `RunArgs`)
/// are public, to avoid exposing private types in public interfaces. It's possible to delegate instead, which
/// can reduce the total number of public types exposed, i.e.
///
/// ```ignore
968
/// type Manager<Spawner, Context> = JobManager<Spawner, Context, ExampleJob>;
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
/// pub struct ExampleSubsystem {
/// 	manager: Manager<Spawner, Context>,
/// }
///
/// impl<Spawner, Context> Subsystem<Context> for ExampleSubsystem<Spawner, Context> { ... }
/// ```
///
/// This dramatically reduces the number of public types in the crate; the only things which must be public are now
///
/// - `struct ExampleSubsystem` (defined by this macro)
/// - `type ToJob` (because it appears in a trait bound)
/// - `type RunArgs` (because it appears in a function signature)
///
/// Implementing this all manually is of course possible, but it's tedious; why bother? This macro exists for
/// the purpose of doing it automatically:
///
/// ```ignore
/// delegated_subsystem!(ExampleJob(ExampleRunArgs) <- ExampleToJob as ExampleSubsystem);
/// ```
#[macro_export]
macro_rules! delegated_subsystem {
990
991
	($job:ident($run_args:ty, $metrics:ty) <- $to_job:ty as $subsystem:ident) => {
		delegated_subsystem!($job($run_args, $metrics) <- $to_job as $subsystem; stringify!($subsystem));
992
993
	};

994
	($job:ident($run_args:ty, $metrics:ty) <- $to_job:ty as $subsystem:ident; $subsystem_name:expr) => {
995
996
		#[doc = "Manager type for the "]
		#[doc = $subsystem_name]
997
		type Manager<Spawner, Context> = $crate::JobManager<Spawner, Context, $job>;
998
999
1000

		#[doc = "An implementation of the "]
		#[doc = $subsystem_name]
For faster browsing, not all history is shown. View entire blame