collator_side.rs 48.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
use std::collections::{HashMap, HashSet};
18

19
use super::{LOG_TARGET,  Result};
20

21
use futures::{select, FutureExt, channel::oneshot};
22

23
use polkadot_primitives::v1::{
24
	CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, CandidateHash,
25
26
};
use polkadot_subsystem::{
27
	jaeger, PerLeafSpan,
28
	FromOverseer, OverseerSignal, SubsystemContext,
29
	messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage, NetworkBridgeEvent},
30
};
31
32
33
use polkadot_node_network_protocol::{
	peer_set::PeerSet, v1 as protocol_v1, View, PeerId, RequestId, OurView,
};
34
use polkadot_node_subsystem_util::{
35
	validator_discovery,
36
37
	request_validators_ctx,
	request_validator_groups_ctx,
38
	request_availability_cores_ctx,
39
	metrics::{self, prometheus},
40
};
41
use polkadot_node_primitives::{SignedFullStatement, Statement};
42

43
#[derive(Clone, Default)]
44
pub struct Metrics(Option<MetricsInner>);
45
46
47
48

impl Metrics {
	fn on_advertisment_made(&self) {
		if let Some(metrics) = &self.0 {
Andronik Ordian's avatar
Andronik Ordian committed
49
			metrics.advertisements_made.inc();
50
51
52
53
54
55
56
57
		}
	}

	fn on_collation_sent(&self) {
		if let Some(metrics) = &self.0 {
			metrics.collations_sent.inc();
		}
	}
58
59
60
61
62
63
64
65
66
67

	/// Provide a timer for handling `ConnectionRequest` which observes on drop.
	fn time_handle_connection_request(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.handle_connection_request.start_timer())
	}

	/// Provide a timer for `process_msg` which observes on drop.
	fn time_process_msg(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
	}
68
69
70
71
}

#[derive(Clone)]
struct MetricsInner {
Andronik Ordian's avatar
Andronik Ordian committed
72
	advertisements_made: prometheus::Counter<prometheus::U64>,
73
	collations_sent: prometheus::Counter<prometheus::U64>,
74
75
	handle_connection_request: prometheus::Histogram,
	process_msg: prometheus::Histogram,
76
77
78
79
80
81
82
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry)
		-> std::result::Result<Self, prometheus::PrometheusError>
	{
		let metrics = MetricsInner {
Andronik Ordian's avatar
Andronik Ordian committed
83
			advertisements_made: prometheus::register(
84
				prometheus::Counter::new(
Andronik Ordian's avatar
Andronik Ordian committed
85
86
					"parachain_collation_advertisements_made_total",
					"A number of collation advertisements sent to validators.",
87
88
89
90
91
92
93
94
95
96
				)?,
				registry,
			)?,
			collations_sent: prometheus::register(
				prometheus::Counter::new(
					"parachain_collations_sent_total",
					"A number of collations sent to validators.",
				)?,
				registry,
			)?,
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
			handle_connection_request: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_collator_protocol_collator_handle_connection_request",
						"Time spent within `collator_protocol_collator::handle_connection_request`",
					)
				)?,
				registry,
			)?,
			process_msg: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_collator_protocol_collator_process_msg",
						"Time spent within `collator_protocol_collator::process_msg`",
					)
				)?,
				registry,
			)?,
115
116
117
118
119
120
		};

		Ok(Metrics(Some(metrics)))
	}
}

121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/// The group of validators that is assigned to our para at a given point of time.
///
/// This structure is responsible for keeping track of which validators belong to a certain group for a para. It also
/// stores a mapping from [`PeerId`] to [`ValidatorId`] as we learn about it over the lifetime of this object. Besides
/// that it also keeps track to which validators we advertised our collation.
struct ValidatorGroup {
	/// All [`ValidatorId`]'s that are assigned to us in this group.
	validator_ids: HashSet<ValidatorId>,
	/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s from the
	/// authority discovery. It is not ensured that this will contain *all* validators of this group.
	peer_ids: HashMap<PeerId, ValidatorId>,
	/// All [`ValidatorId`]'s of the current group to that we advertised our collation.
	advertised_to: HashSet<ValidatorId>,
}

impl ValidatorGroup {
	/// Returns `true` if we should advertise our collation to the given peer.
	fn should_advertise_to(&self, peer: &PeerId) -> bool {
		match self.peer_ids.get(peer) {
			Some(validator_id) => !self.advertised_to.contains(validator_id),
			None => false,
		}
	}

	/// Should be called after we advertised our collation to the given `peer` to keep track of it.
	fn advertised_to_peer(&mut self, peer: &PeerId) {
		if let Some(validator_id) = self.peer_ids.get(peer) {
			self.advertised_to.insert(validator_id.clone());
		}
	}

	/// Add a [`PeerId`] that belongs to the given [`ValidatorId`].
	///
	/// This returns `true` if the given validator belongs to this group and we could insert its [`PeerId`].
	fn add_peer_id_for_validator(&mut self, peer_id: &PeerId, validator_id: &ValidatorId) -> bool {
		if !self.validator_ids.contains(validator_id) {
			false
		} else {
			self.peer_ids.insert(peer_id.clone(), validator_id.clone());
			true
		}
	}
}

impl From<HashSet<ValidatorId>> for ValidatorGroup {
	fn from(validator_ids: HashSet<ValidatorId>) -> Self {
		Self {
			validator_ids,
			peer_ids: HashMap::new(),
			advertised_to: HashSet::new(),
		}
	}
}

175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/// The status of a collation as seen from the collator.
enum CollationStatus {
	/// The collation was created, but we did not advertise it to any validator.
	Created,
	/// The collation was advertised to at least one validator.
	Advertised,
	/// The collation was requested by at least one validator.
	Requested,
}

impl CollationStatus {
	/// Advance to the [`Self::Advertised`] status.
	///
	/// This ensures that `self` isn't already [`Self::Requested`].
	fn advance_to_advertised(&mut self) {
		if !matches!(self, Self::Requested) {
			*self = Self::Advertised;
		}
	}

	/// Advance to the [`Self::Requested`] status.
	fn advance_to_requested(&mut self) {
		*self = Self::Requested;
	}
}

/// A collation built by the collator.
struct Collation {
	receipt: CandidateReceipt,
	pov: PoV,
	status: CollationStatus,
}

208
209
210
211
212
213
214
215
216
217
218
219
220
221
#[derive(Default)]
struct State {
	/// Our id.
	our_id: CollatorId,

	/// The para this collator is collating on.
	/// Starts as `None` and is updated with every `CollateOn` message.
	collating_on: Option<ParaId>,

	/// Track all active peers and their views
	/// to determine what is relevant to them.
	peer_views: HashMap<PeerId, View>,

	/// Our own view.
222
223
224
225
	view: OurView,

	/// Span per relay parent.
	span_per_relay_parent: HashMap<Hash, PerLeafSpan>,
226
227
228
229

	/// Possessed collations.
	///
	/// We will keep up to one local collation per relay-parent.
230
	collations: HashMap<Hash, Collation>,
231

232
233
234
	/// The result senders per collation.
	collation_result_senders: HashMap<CandidateHash, oneshot::Sender<SignedFullStatement>>,

235
236
	/// Our validator groups per active leaf.
	our_validators_groups: HashMap<Hash, ValidatorGroup>,
237

238
239
	/// List of peers where we declared ourself as a collator.
	declared_at: HashSet<PeerId>,
240

241
242
	/// The connection requests to validators per relay parent.
	connection_requests: validator_discovery::ConnectionRequests,
243

244
245
	/// Metrics.
	metrics: Metrics,
246
247
}

248
249
250
251
252
253
254
impl State {
	/// Returns `true` if the given `peer` is interested in the leaf that is represented by `relay_parent`.
	fn peer_interested_in_leaf(&self, peer: &PeerId, relay_parent: &Hash) -> bool {
		self.peer_views.get(peer).map(|v| v.contains(relay_parent)).unwrap_or(false)
	}
}

255
/// Distribute a collation.
256
///
257
258
259
260
261
262
/// Figure out the core our para is assigned to and the relevant validators.
/// Issue a connection request to these validators.
/// If the para is not scheduled or next up on any core, at the relay-parent,
/// or the relay-parent isn't in the active-leaves set, we ignore the message
/// as it must be invalid in that case - although this indicates a logic error
/// elsewhere in the node.
263
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
264
265
async fn distribute_collation(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
266
267
268
269
	state: &mut State,
	id: ParaId,
	receipt: CandidateReceipt,
	pov: PoV,
270
	result_sender: Option<oneshot::Sender<SignedFullStatement>>,
271
) -> Result<()> {
272
273
274
275
	let relay_parent = receipt.descriptor.relay_parent;

	// This collation is not in the active-leaves set.
	if !state.view.contains(&relay_parent) {
276
		tracing::warn!(
277
			target: LOG_TARGET,
278
279
			relay_parent = %relay_parent,
			"distribute collation message parent is outside of our view",
280
281
282
283
284
285
286
287
288
289
290
291
292
		);

		return Ok(());
	}

	// We have already seen collation for this relay parent.
	if state.collations.contains_key(&relay_parent) {
		return Ok(());
	}

	// Determine which core the para collated-on is assigned to.
	// If it is not scheduled then ignore the message.
	let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
293
294
		Some(core) => core,
		None => {
295
			tracing::warn!(
296
				target: LOG_TARGET,
297
298
299
				para_id = %id,
				relay_parent = %relay_parent,
				"looks like no core is assigned to {} at {}", id, relay_parent,
300
			);
301
302

			return Ok(())
303
304
305
306
		}
	};

	// Determine the group on that core and the next group on that core.
307
	let (current_validators, next_validators) = determine_our_validators(ctx, our_core, num_cores, relay_parent).await?;
308

309
310
311
312
313
314
	if current_validators.is_empty() && next_validators.is_empty() {
		tracing::warn!(
			target: LOG_TARGET,
			core = ?our_core,
			"there are no validators assigned to core",
		);
315

316
		return Ok(());
317
318
	}

319
	// Issue a discovery request for the validators of the current group and the next group.
320
321
322
323
324
325
326
	connect_to_validators(
		ctx,
		relay_parent,
		id,
		state,
		current_validators.union(&next_validators).cloned().collect(),
	).await?;
327
328

	state.our_validators_groups.insert(relay_parent, current_validators.into());
329

330
331
332
333
	if let Some(result_sender) = result_sender {
		state.collation_result_senders.insert(receipt.hash(), result_sender);
	}

334
	state.collations.insert(relay_parent, Collation { receipt, pov, status: CollationStatus::Created });
335
336
337
338
339
340

	Ok(())
}

/// Get the Id of the Core that is assigned to the para being collated on if any
/// and the total number of cores.
341
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
342
343
async fn determine_core(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
344
345
	para_id: ParaId,
	relay_parent: Hash,
346
) -> Result<Option<(CoreIndex, usize)>> {
347
	let cores = request_availability_cores_ctx(relay_parent, ctx).await?.await??;
348
349
350
351
352
353
354
355
356
357
358
359

	for (idx, core) in cores.iter().enumerate() {
		if let CoreState::Scheduled(occupied) = core {
			if occupied.para_id == para_id {
				return Ok(Some(((idx as u32).into(), cores.len())));
			}
		}
	}

	Ok(None)
}

360
/// Figure out current and next group of validators assigned to the para being collated on.
361
///
362
/// Returns [`ValidatorId`]'s of current and next group as determined based on the `relay_parent`.
363
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
364
365
async fn determine_our_validators(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
366
367
368
	core_index: CoreIndex,
	cores: usize,
	relay_parent: Hash,
369
) -> Result<(HashSet<ValidatorId>, HashSet<ValidatorId>)> {
370
371
372
373
374
	let groups = request_validator_groups_ctx(relay_parent, ctx).await?;

	let groups = groups.await??;

	let current_group_index = groups.1.group_for_core(core_index, cores);
375
	let current_validators = groups.0.get(current_group_index.0 as usize).map(|v| v.as_slice()).unwrap_or_default();
376
377

	let next_group_idx = (current_group_index.0 as usize + 1) % groups.0.len();
378
	let next_validators = groups.0.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default();
379

380
	let validators = request_validators_ctx(relay_parent, ctx).await?.await??;
381

382
383
	let current_validators = current_validators.iter().map(|i| validators[*i as usize].clone()).collect();
	let next_validators = next_validators.iter().map(|i| validators[*i as usize].clone()).collect();
384

385
	Ok((current_validators, next_validators))
386
387
}

388
/// Issue a `Declare` collation message to the given `peer`.
389
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
390
391
async fn declare(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
392
	state: &mut State,
393
	peer: PeerId,
394
) {
395
396
397
398
	let wire_message = protocol_v1::CollatorProtocolMessage::Declare(state.our_id.clone());

	ctx.send_message(AllMessages::NetworkBridge(
		NetworkBridgeMessage::SendCollationMessage(
399
			vec![peer],
400
401
			protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
		)
402
	)).await;
403
404
}

405
406
/// Issue a connection request to a set of validators and
/// revoke the previous connection request.
407
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
408
409
async fn connect_to_validators(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
410
	relay_parent: Hash,
411
	para_id: ParaId,
412
413
	state: &mut State,
	validators: Vec<ValidatorId>,
414
) -> Result<()> {
415
416
417
418
	let request = validator_discovery::connect_to_validators(
		ctx,
		relay_parent,
		validators,
419
		PeerSet::Collation,
420
	).await?;
421

422
	state.connection_requests.put(relay_parent, para_id, request);
423
424
425
426

	Ok(())
}

427
428
429
430
/// Advertise collation to the given `peer`.
///
/// This will only advertise a collation if there exists one for the given `relay_parent` and the given `peer` is
/// set as validator for our para at the given `relay_parent`.
431
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
432
433
async fn advertise_collation(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
434
435
	state: &mut State,
	relay_parent: Hash,
436
	peer: PeerId,
437
) {
438
	let collating_on = match state.collating_on {
439
		Some(collating_on) => collating_on,
440
		None => return,
441
442
	};

443
444
445
446
447
	let should_advertise = state.our_validators_groups
		.get(&relay_parent)
		.map(|g| g.should_advertise_to(&peer))
		.unwrap_or(false);

448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
	match (state.collations.get_mut(&relay_parent), should_advertise) {
		(None, _) => {
			tracing::trace!(
				target: LOG_TARGET,
				relay_parent = ?relay_parent,
				peer_id = %peer,
				"No collation to advertise.",
			);
			return
		},
		(_, false) => {
			tracing::trace!(
				target: LOG_TARGET,
				relay_parent = ?relay_parent,
				peer_id = %peer,
				"Not advertising collation as we already advertised it to this validator.",
			);
			return
		}
		(Some(collation), true) => collation.status.advance_to_advertised(),
468
469
	}

470
471
472
473
	let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent, collating_on);

	ctx.send_message(AllMessages::NetworkBridge(
		NetworkBridgeMessage::SendCollationMessage(
474
			vec![peer.clone()],
475
476
			protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
		)
477
	)).await;
478

479
480
481
482
	if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) {
		validators.advertised_to_peer(&peer);
	}

483
	state.metrics.on_advertisment_made();
484
485
486
}

/// The main incoming message dispatching switch.
487
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
488
489
async fn process_msg(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
490
491
	state: &mut State,
	msg: CollatorProtocolMessage,
492
) -> Result<()> {
493
494
	use CollatorProtocolMessage::*;

495
496
	let _timer = state.metrics.time_process_msg();

497
498
499
500
	match msg {
		CollateOn(id) => {
			state.collating_on = Some(id);
		}
501
		DistributeCollation(receipt, pov, result_sender) => {
502
503
			let _span1 = state.span_per_relay_parent
				.get(&receipt.descriptor.relay_parent).map(|s| s.child("distributing-collation"));
504
			let _span2 = jaeger::pov_span(&pov, "distributing-collation");
505
506
507
508
			match state.collating_on {
				Some(id) if receipt.descriptor.para_id != id => {
					// If the ParaId of a collation requested to be distributed does not match
					// the one we expect, we ignore the message.
509
					tracing::warn!(
510
						target: LOG_TARGET,
511
512
513
						para_id = %receipt.descriptor.para_id,
						collating_on = %id,
						"DistributeCollation for unexpected para_id",
514
515
516
					);
				}
				Some(id) => {
517
					distribute_collation(ctx, state, id, receipt, pov, result_sender).await?;
518
519
				}
				None => {
520
					tracing::warn!(
521
						target: LOG_TARGET,
522
523
						para_id = %receipt.descriptor.para_id,
						"DistributeCollation message while not collating on any",
524
525
526
527
528
					);
				}
			}
		}
		FetchCollation(_, _, _, _) => {
529
			tracing::warn!(
530
				target: LOG_TARGET,
531
532
533
534
				"FetchCollation message is not expected on the collator side of the protocol",
			);
		}
		ReportCollator(_) => {
535
			tracing::warn!(
536
				target: LOG_TARGET,
537
538
539
540
				"ReportCollator message is not expected on the collator side of the protocol",
			);
		}
		NoteGoodCollation(_) => {
541
			tracing::warn!(
542
				target: LOG_TARGET,
543
544
545
				"NoteGoodCollation message is not expected on the collator side of the protocol",
			);
		}
546
547
548
549
550
551
		NotifyCollationSeconded(_, _) => {
			tracing::warn!(
				target: LOG_TARGET,
				"NotifyCollationSeconded message is not expected on the collator side of the protocol",
			);
		}
552
553
554
555
556
557
		NetworkBridgeUpdateV1(event) => {
			if let Err(e) = handle_network_msg(
				ctx,
				state,
				event,
			).await {
558
				tracing::warn!(
559
					target: LOG_TARGET,
560
561
					err = ?e,
					"Failed to handle incoming network message",
562
563
564
565
566
567
568
569
570
				);
			}
		},
	}

	Ok(())
}

/// Issue a response to a previously requested collation.
571
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
572
573
async fn send_collation(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
574
	state: &mut State,
575
576
577
578
	request_id: RequestId,
	origin: PeerId,
	receipt: CandidateReceipt,
	pov: PoV,
579
) {
580
581
582
583
584
585
586
587
588
589
590
591
592
	let pov = match protocol_v1::CompressedPoV::compress(&pov) {
		Ok(pov) => pov,
		Err(error) => {
			tracing::debug!(
				target: LOG_TARGET,
				error = ?error,
				"Failed to create `CompressedPov`",
			);
			return
		}
	};

	let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov);
593
594
595
596
597
598

	ctx.send_message(AllMessages::NetworkBridge(
		NetworkBridgeMessage::SendCollationMessage(
			vec![origin],
			protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
		)
599
	)).await;
600

601
	state.metrics.on_collation_sent();
602
603
604
}

/// A networking messages switch.
605
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
606
607
async fn handle_incoming_peer_message(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
608
609
610
	state: &mut State,
	origin: PeerId,
	msg: protocol_v1::CollatorProtocolMessage,
611
) -> Result<()> {
612
613
614
	use protocol_v1::CollatorProtocolMessage::*;

	match msg {
615
		Declare(_) => {
616
			tracing::warn!(
617
				target: LOG_TARGET,
618
619
620
				"Declare message is not expected on the collator side of the protocol",
			);
		}
621
		AdvertiseCollation(_, _) => {
622
			tracing::warn!(
623
				target: LOG_TARGET,
624
625
626
				"AdvertiseCollation message is not expected on the collator side of the protocol",
			);
		}
627
		RequestCollation(request_id, relay_parent, para_id) => {
628
			let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("request-collation"));
629
630
631
			match state.collating_on {
				Some(our_para_id) => {
					if our_para_id == para_id {
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
						let (receipt, pov) = if let Some(collation) = state.collations.get_mut(&relay_parent) {
							collation.status.advance_to_requested();
							(collation.receipt.clone(), collation.pov.clone())
						} else {
							tracing::warn!(
								target: LOG_TARGET,
								relay_parent = %relay_parent,
								"received a `RequestCollation` for a relay parent we don't have collation stored.",
							);

							return Ok(());
						};

						let _span = _span.as_ref().map(|s| s.child("sending"));
						send_collation(ctx, state, request_id, origin, receipt, pov).await;
647
					} else {
648
						tracing::warn!(
649
							target: LOG_TARGET,
650
651
							for_para_id = %para_id,
							our_para_id = %our_para_id,
652
							"received a `RequestCollation` for unexpected para_id",
653
654
655
656
						);
					}
				}
				None => {
657
					tracing::warn!(
658
						target: LOG_TARGET,
659
						for_para_id = %para_id,
660
						"received a `RequestCollation` while not collating on any para",
661
662
663
664
					);
				}
			}
		}
665
		Collation(_, _, _) => {
666
			tracing::warn!(
667
				target: LOG_TARGET,
668
669
670
				"Collation message is not expected on the collator side of the protocol",
			);
		}
671
672
673
674
675
676
677
678
679
680
681
		CollationSeconded(statement) => {
			if !matches!(statement.payload(), Statement::Seconded(_)) {
				tracing::warn!(
					target: LOG_TARGET,
					statement = ?statement,
					"Collation seconded message received with none-seconded statement.",
				);
			} else if let Some(sender) = state.collation_result_senders.remove(&statement.payload().candidate_hash()) {
				let _ = sender.send(statement);
			}
		}
682
683
684
685
686
687
	}

	Ok(())
}

/// Our view has changed.
688
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
689
690
async fn handle_peer_view_change(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
691
692
693
	state: &mut State,
	peer_id: PeerId,
	view: View,
694
) {
695
696
697
698
699
700
701
	let current = state.peer_views.entry(peer_id.clone()).or_default();

	let added: Vec<Hash> = view.difference(&*current).cloned().collect();

	*current = view;

	for added in added.into_iter() {
702
		advertise_collation(ctx, state, added, peer_id.clone()).await;
703
704
705
	}
}

706
/// A validator is connected.
707
///
708
/// `Declare` that we are a collator with a given `CollatorId`.
709
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
710
711
async fn handle_validator_connected(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
712
713
	state: &mut State,
	peer_id: PeerId,
714
	validator_id: ValidatorId,
715
	relay_parent: Hash,
716
) {
717
718
719
	let not_declared = state.declared_at.insert(peer_id.clone());

	if not_declared {
720
		declare(ctx, state, peer_id.clone()).await;
721
722
723
724
725
726
727
728
729
730
731
732
	}

	// Store the PeerId and find out if we should advertise to this peer.
	//
	// If this peer does not belong to the para validators, we also don't need to try to advertise our collation.
	let advertise = if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) {
		validators.add_peer_id_for_validator(&peer_id, &validator_id)
	} else {
		false
	};

	if advertise && state.peer_interested_in_leaf(&peer_id, &relay_parent) {
733
		advertise_collation(ctx, state, relay_parent, peer_id).await;
734
	}
735
736
737
}

/// Bridge messages switch.
738
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
739
740
async fn handle_network_msg(
	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
741
742
	state: &mut State,
	bridge_message: NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>,
743
) -> Result<()> {
744
745
746
	use NetworkBridgeEvent::*;

	match bridge_message {
747
		PeerConnected(_peer_id, _observed_role) => {
748
749
			// If it is possible that a disconnected validator would attempt a reconnect
			// it should be handled here.
750
751
		}
		PeerViewChange(peer_id, view) => {
752
			handle_peer_view_change(ctx, state, peer_id, view).await;
753
754
755
		}
		PeerDisconnected(peer_id) => {
			state.peer_views.remove(&peer_id);
756
			state.declared_at.remove(&peer_id);
757
758
759
760
		}
		OurViewChange(view) => {
			handle_our_view_change(state, view).await?;
		}
761
		PeerMessage(remote, msg) => {
762
763
764
765
766
767
768
769
			handle_incoming_peer_message(ctx, state, remote, msg).await?;
		}
	}

	Ok(())
}

/// Handles our view changes.
770
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
771
772
async fn handle_our_view_change(
	state: &mut State,
773
	view: OurView,
774
) -> Result<()> {
775
	for removed in state.view.difference(&view) {
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
		tracing::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed.");

		if let Some(collation) = state.collations.remove(removed) {
			state.collation_result_senders.remove(&collation.receipt.hash());

			match collation.status {
				CollationStatus::Created => tracing::warn!(
					target: LOG_TARGET,
					candidate_hash = ?collation.receipt.hash(),
					pov_hash = ?collation.pov.hash(),
					"Collation wasn't advertised to any validator.",
				),
				CollationStatus::Advertised => tracing::debug!(
					target: LOG_TARGET,
					candidate_hash = ?collation.receipt.hash(),
					pov_hash = ?collation.pov.hash(),
					"Collation was advertised but not requested by any validator.",
				),
				CollationStatus::Requested => tracing::debug!(
					target: LOG_TARGET,
					candidate_hash = ?collation.receipt.hash(),
					pov_hash = ?collation.pov.hash(),
					"Collation was requested.",
				)
			}
801
		}
802
		state.our_validators_groups.remove(removed);
803
		state.connection_requests.remove_all(removed);
804
		state.span_per_relay_parent.remove(removed);
805
806
	}

807
808
	state.view = view;

809
810
811
812
	Ok(())
}

/// The collator protocol collator side main loop.
813
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
814
815
pub(crate) async fn run(
	mut ctx: impl SubsystemContext<Message = CollatorProtocolMessage>,
816
817
	our_id: CollatorId,
	metrics: Metrics,
818
) -> Result<()> {
819
820
821
	use FromOverseer::*;
	use OverseerSignal::*;

822
823
	let mut state = State {
		metrics,
824
		our_id,
825
826
		..Default::default()
	};
827
828

	loop {
829
		select! {
830
			res = state.connection_requests.next().fuse() => {
831
832
				let _timer = state.metrics.time_handle_connection_request();

833
				handle_validator_connected(
834
835
					&mut ctx,
					&mut state,
836
837
838
					res.peer_id,
					res.validator_id,
					res.relay_parent,
839
				).await;
840
841
			},
			msg = ctx.recv().fuse() => match msg? {
842
843
				Communication { msg } => {
					if let Err(e) = process_msg(&mut ctx, &mut state, msg).await {
844
						tracing::warn!(target: LOG_TARGET, err = ?e, "Failed to process message");
845
846
					}
				},
847
				Signal(ActiveLeaves(_update)) => {}
848
				Signal(BlockFinalized(..)) => {}
849
850
851
852
				Signal(Conclude) => return Ok(()),
			}
		}
	}
853
854
855
856
857
858
}

#[cfg(test)]
mod tests {
	use super::*;

859
	use std::{time::Duration, sync::Arc};
860

861
	use assert_matches::assert_matches;
862
	use futures::{executor, future, Future, channel::mpsc};
863
864
865
866
867
868

	use sp_core::crypto::Pair;
	use sp_keyring::Sr25519Keyring;

	use polkadot_primitives::v1::{
		BlockData, CandidateDescriptor, CollatorPair, ScheduledCore,
869
		ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId,
870
		SessionIndex, SessionInfo,
871
	};
872
	use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger};
873
	use polkadot_node_subsystem_util::TimeoutExt;
874
	use polkadot_subsystem_testhelpers as test_helpers;
875
	use polkadot_node_network_protocol::{view, our_view};
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900

	#[derive(Default)]
	struct TestCandidateBuilder {
		para_id: ParaId,
		pov_hash: Hash,
		relay_parent: Hash,
		commitments_hash: Hash,
	}

	impl TestCandidateBuilder {
		fn build(self) -> CandidateReceipt {
			CandidateReceipt {
				descriptor: CandidateDescriptor {
					para_id: self.para_id,
					pov_hash: self.pov_hash,
					relay_parent: self.relay_parent,
					..Default::default()
				},
				commitments_hash: self.commitments_hash,
			}
		}
	}

	#[derive(Clone)]
	struct TestState {
901
		para_id: ParaId,
902
903
		validators: Vec<Sr25519Keyring>,
		validator_public: Vec<ValidatorId>,
904
		validator_authority_id: Vec<AuthorityDiscoveryId>,
905
906
907
		validator_peer_id: Vec<PeerId>,
		validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
		relay_parent: Hash,
908
		availability_core: CoreState,
909
		our_collator_pair: CollatorPair,
910
		session_index: SessionIndex,
911
912
913
914
915
916
	}

	fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
		val_ids.iter().map(|v| v.public().into()).collect()
	}

917
918
919
920
	fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryId> {
		val_ids.iter().map(|v| v.public().into()).collect()
	}

921
922
	impl Default for TestState {
		fn default() -> Self {
923
			let para_id = ParaId::from(1);
924
925
926
927
928
929
930
931
932
933

			let validators = vec![
				Sr25519Keyring::Alice,
				Sr25519Keyring::Bob,
				Sr25519Keyring::Charlie,
				Sr25519Keyring::Dave,
				Sr25519Keyring::Ferdie,
			];

			let validator_public = validator_pubkeys(&validators);
934
			let validator_authority_id = validator_authority_id(&validators);
935
936
937
938
939

			let validator_peer_id = std::iter::repeat_with(|| PeerId::random())
				.take(validator_public.len())
				.collect();

940
			let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]];
941
942
943
944
945
946
947
			let group_rotation_info = GroupRotationInfo {
				session_start_block: 0,
				group_rotation_frequency: 100,
				now: 1,
			};
			let validator_groups = (validator_groups, group_rotation_info);

948
949
950
951
			let availability_core = CoreState::Scheduled(ScheduledCore {
				para_id,
				collator: None,
			});
952

953
			let relay_parent = Hash::random();
954
955
956
957

			let our_collator_pair = CollatorPair::generate().0;

			Self {
958
				para_id,
959
960
				validators,
				validator_public,
961
				validator_authority_id,
962
963
964
				validator_peer_id,
				validator_groups,
				relay_parent,
965
				availability_core,
966
				our_collator_pair,
967
				session_index: 1,
968
969
970
971
			}
		}
	}

972
973
974
975
976
	impl TestState {
		fn current_group_validator_indices(&self) -> &[ValidatorIndex] {
			&self.validator_groups.0[0]
		}

977
978
979
980
		fn current_session_index(&self) -> SessionIndex {
			self.session_index
		}

981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
		fn current_group_validator_peer_ids(&self) -> Vec<PeerId> {
			self.current_group_validator_indices().iter().map(|i| self.validator_peer_id[*i as usize].clone()).collect()
		}

		fn current_group_validator_authority_ids(&self) -> Vec<AuthorityDiscoveryId> {
			self.current_group_validator_indices()
				.iter()
				.map(|i| self.validator_authority_id[*i as usize].clone())
				.collect()
		}

		fn current_group_validator_ids(&self) -> Vec<ValidatorId> {
			self.current_group_validator_indices()
				.iter()
				.map(|i| self.validator_public[*i as usize].clone())
				.collect()
		}

		fn next_group_validator_indices(&self) -> &[ValidatorIndex] {
			&self.validator_groups.0[1]
For faster browsing, not all history is shown. View entire blame