lib.rs 42.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The Network Bridge Subsystem - protocol multiplexer for Polkadot.

19
#![deny(unused_crate_dependencies)]
20
21
22
#![warn(missing_docs)]


23
24
25
26
use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;

use polkadot_subsystem::{
27
	ActiveLeavesUpdate, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
28
	SubsystemResult, jaeger,
29
};
30
31
32
use polkadot_subsystem::messages::{
	NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage,
	BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
33
	CollatorProtocolMessage, ApprovalDistributionMessage, NetworkBridgeEvent,
34
};
35
use polkadot_primitives::v1::{Hash, BlockNumber};
36
use polkadot_node_network_protocol::{
37
	PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep,
38
};
39

40
41
42
43
44
/// Peer set infos for network initialization.
///
/// To be added to [`NetworkConfiguration::extra_sets`].
pub use polkadot_node_network_protocol::peer_set::peer_sets_info;

45
use std::collections::{HashMap, hash_map};
46
use std::iter::ExactSizeIterator;
47
48
use std::sync::Arc;

49
50
mod validator_discovery;

51
52
53
54
55
/// Internally used `Action` type.
///
/// All requested `NetworkBridgeMessage` user actions  and `NetworkEvent` network messages are
/// translated to `Action` before being processed by `run_network`.
mod action;
56
use action::{Action, AbortReason};
57
58
59
60
61
62
63

/// Actual interfacing to the network based on the `Network` trait.
///
/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`.
mod network;
use network::{Network, send_message};

64
65
66
67
/// Request multiplexer for combining the multiple request sources into a single `Stream` of `AllMessages`.
mod multiplexer;
pub use multiplexer::RequestMultiplexer;

68

69
70
71
72
73
74
/// The maximum amount of heads a peer is allowed to have in their view at any time.
///
/// We use the same limit to compute the view sent to peers locally.
const MAX_VIEW_HEADS: usize = 5;


75
76
77
78
const MALFORMED_MESSAGE_COST: Rep = Rep::CostMajor("Malformed Network-bridge message");
const UNCONNECTED_PEERSET_COST: Rep = Rep::CostMinor("Message sent to un-connected peer-set");
const MALFORMED_VIEW_COST: Rep = Rep::CostMajor("Malformed view");
const EMPTY_VIEW_COST: Rep = Rep::CostMajor("Peer sent us an empty view");
79

80
// network bridge log target
81
const LOG_TARGET: &'static str = "network_bridge";
82

83
84
85
/// Messages from and to the network.
///
/// As transmitted to and received from subsystems.
86
#[derive(Debug, Encode, Decode, Clone)]
87
pub enum WireMessage<M> {
88
	/// A message from a peer on a specific protocol.
89
	#[codec(index = 1)]
90
	ProtocolMessage(M),
91
	/// A view update from a peer.
92
	#[codec(index = 2)]
93
94
95
96
97
	ViewUpdate(View),
}


/// The network bridge subsystem.
98
pub struct NetworkBridge<N, AD> {
99
	/// `Network` trait implementing type.
100
101
	network_service: N,
	authority_discovery_service: AD,
102
	request_multiplexer: RequestMultiplexer,
103
}
104

105
106
impl<N, AD> NetworkBridge<N, AD> {
	/// Create a new network bridge subsystem with underlying network service and authority discovery service.
107
108
	///
	/// This assumes that the network service has had the notifications protocol for the network
109
	/// bridge already registered. See [`peers_sets_info`](peers_sets_info).
110
	pub fn new(network_service: N, authority_discovery_service: AD, request_multiplexer: RequestMultiplexer) -> Self {
111
112
113
		NetworkBridge {
			network_service,
			authority_discovery_service,
114
			request_multiplexer,
115
		}
116
117
118
	}
}

119
impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
120
	where
121
122
		Net: Network + validator_discovery::Network,
		AD: validator_discovery::AuthorityDiscovery,
123
124
		Context: SubsystemContext<Message=NetworkBridgeMessage>,
{
125
126
127
	fn start(self, ctx: Context) -> SpawnedSubsystem {
		// Swallow error because failure is fatal to the node and we log with more precision
		// within `run_network`.
128
		let future = run_network(self, ctx)
129
130
131
132
133
134
135
			.map_err(|e| {
				SubsystemError::with_origin("network-bridge", e)
			})
			.boxed();
		SpawnedSubsystem {
			name: "network-bridge-subsystem",
			future,
136
		}
137
138
139
140
141
142
143
144
	}
}

struct PeerData {
	/// Latest view sent by the peer.
	view: View,
}

145
/// Main driver, processing network events and messages from other subsystems.
146
#[tracing::instrument(skip(bridge, ctx), fields(subsystem = LOG_TARGET))]
147
async fn run_network<N, AD>(
148
	mut bridge: NetworkBridge<N, AD>,
149
150
151
152
153
154
	mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()>
where
	N: Network + validator_discovery::Network,
	AD: validator_discovery::AuthorityDiscovery,
{
155
	let mut event_stream = bridge.network_service.event_stream().fuse();
156

157
	// Most recent heads are at the back.
158
	let mut live_heads: Vec<(Hash, Arc<jaeger::Span>)> = Vec::with_capacity(MAX_VIEW_HEADS);
159
160
	let mut local_view = View::default();
	let mut finalized_number = 0;
161

162
163
164
165
166
167
168
169
170
171
	let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new();
	let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new();

	let mut validator_discovery = validator_discovery::Service::<N, AD>::new();

	loop {

		let action = {
			let subsystem_next = ctx.recv().fuse();
			let mut net_event_next = event_stream.next().fuse();
172
			let mut req_res_event_next = bridge.request_multiplexer.next();
173
174
175
176
177
			futures::pin_mut!(subsystem_next);

			futures::select! {
				subsystem_msg = subsystem_next => Action::from(subsystem_msg),
				net_event = net_event_next => Action::from(net_event),
178
				req_res_event = req_res_event_next  => Action::from(req_res_event),
179
			}
180
181
182
183
		};

		match action {
			Action::Nop => {}
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
			Action::Abort(reason) => match reason {
				AbortReason::SubsystemError(err) => {
					tracing::warn!(
						target: LOG_TARGET,
						err = ?err,
						"Shutting down Network Bridge due to error"
					);
					return Err(SubsystemError::Context(format!(
						"Received SubsystemError from overseer: {:?}",
						err
					)));
				}
				AbortReason::EventStreamConcluded => {
					tracing::info!(
						target: LOG_TARGET,
						"Shutting down Network Bridge: underlying request stream concluded"
					);
					return Err(SubsystemError::Context(
						"Incoming network event stream concluded.".to_string(),
					));
				}
				AbortReason::RequestStreamConcluded => {
					tracing::info!(
						target: LOG_TARGET,
						"Shutting down Network Bridge: underlying request stream concluded"
					);
					return Err(SubsystemError::Context(
						"Incoming network request stream concluded".to_string(),
					));
				}
				AbortReason::OverseerConcluded => return Ok(()),
			}
216
217
218
219

			Action::SendValidationMessages(msgs) => {
				for (peers, msg) in msgs {
					send_message(
220
							&mut bridge.network_service,
221
222
223
224
225
							peers,
							PeerSet::Validation,
							WireMessage::ProtocolMessage(msg),
					).await?
				}
226
			}
227
228
229
230

			Action::SendCollationMessages(msgs) => {
				for (peers, msg) in msgs {
					send_message(
231
							&mut bridge.network_service,
232
233
234
235
236
237
238
							peers,
							PeerSet::Collation,
							WireMessage::ProtocolMessage(msg),
					).await?
				}
			}

239
240
241
242
243
244
			Action::SendRequests(reqs) => {
				for req in reqs {
					bridge.network_service.start_request(req);
				}
			},

245
246
			Action::ConnectToValidators {
				validator_ids,
247
				peer_set,
248
249
				connected,
			} => {
250
251
252
253
254
255
				tracing::debug!(
					target: LOG_TARGET,
					peer_set = ?peer_set,
					ids = ?validator_ids,
					"Received a validator connection request",
				);
256
257
				let (ns, ads) = validator_discovery.on_request(
					validator_ids,
258
					peer_set,
259
					connected,
260
261
					bridge.network_service,
					bridge.authority_discovery_service,
262
				).await;
263
264
				bridge.network_service = ns;
				bridge.authority_discovery_service = ads;
265
266
			},

267
268
269
			Action::ReportPeer(peer, rep) => {
				bridge.network_service.report_peer(peer, rep).await?
			}
270
271
272
273
274
275

			Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
				live_heads.extend(activated);
				live_heads.retain(|h| !deactivated.contains(&h.0));

				update_our_view(
276
					&mut bridge.network_service,
277
278
279
280
281
282
283
					&mut ctx,
					&live_heads,
					&mut local_view,
					finalized_number,
					&validation_peers,
					&collation_peers,
				).await?;
284
			}
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301

			Action::BlockFinalized(number) => {
				debug_assert!(finalized_number < number);

				// we don't send the view updates here, but delay them until the next `Action::ActiveLeaves`
				// otherwise it might break assumptions of some of the subsystems
				// that we never send the same `ActiveLeavesUpdate`
				// this is fine, we will get `Action::ActiveLeaves` on block finalization anyway
				finalized_number = number;
			},

			Action::PeerConnected(peer_set, peer, role) => {
				let peer_map = match peer_set {
					PeerSet::Validation => &mut validation_peers,
					PeerSet::Collation => &mut collation_peers,
				};

302
303
304
305
306
				validator_discovery.on_peer_connected(
					peer.clone(),
					peer_set,
					&mut bridge.authority_discovery_service,
				).await;
307
308
309
310
311
312
313
314
315

				match peer_map.entry(peer.clone()) {
					hash_map::Entry::Occupied(_) => continue,
					hash_map::Entry::Vacant(vacant) => {
						let _ = vacant.insert(PeerData {
							view: View::default(),
						});

						match peer_set {
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
							PeerSet::Validation => {
								dispatch_validation_events_to_all(
									vec![
										NetworkBridgeEvent::PeerConnected(peer.clone(), role),
										NetworkBridgeEvent::PeerViewChange(
											peer.clone(),
											View::default(),
										),
									],
									&mut ctx,
								).await;

								send_message(
									&mut bridge.network_service,
									vec![peer],
									PeerSet::Validation,
									WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
										local_view.clone()
334
									),
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
								).await?;
							}
							PeerSet::Collation => {
								dispatch_collation_events_to_all(
									vec![
										NetworkBridgeEvent::PeerConnected(peer.clone(), role),
										NetworkBridgeEvent::PeerViewChange(
											peer.clone(),
											View::default(),
										),
									],
									&mut ctx,
								).await;

								send_message(
									&mut bridge.network_service,
									vec![peer],
									PeerSet::Collation,
									WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(
										local_view.clone()
355
									),
356
357
								).await?;
							}
358
359
360
361
362
363
364
365
366
367
						}
					}
				}
			}
			Action::PeerDisconnected(peer_set, peer) => {
				let peer_map = match peer_set {
					PeerSet::Validation => &mut validation_peers,
					PeerSet::Collation => &mut collation_peers,
				};

368
				validator_discovery.on_peer_disconnected(&peer, peer_set);
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388

				if peer_map.remove(&peer).is_some() {
					match peer_set {
						PeerSet::Validation => dispatch_validation_event_to_all(
							NetworkBridgeEvent::PeerDisconnected(peer),
							&mut ctx,
						).await,
						PeerSet::Collation => dispatch_collation_event_to_all(
							NetworkBridgeEvent::PeerDisconnected(peer),
							&mut ctx,
						).await,
					}
				}
			},
			Action::PeerMessages(peer, v_messages, c_messages) => {
				if !v_messages.is_empty() {
					let events = handle_peer_messages(
						peer.clone(),
						&mut validation_peers,
						v_messages,
389
						&mut bridge.network_service,
390
391
392
393
394
395
396
397
398
399
					).await?;

					dispatch_validation_events_to_all(events, &mut ctx).await;
				}

				if !c_messages.is_empty() {
					let events = handle_peer_messages(
						peer.clone(),
						&mut collation_peers,
						c_messages,
400
						&mut bridge.network_service,
401
402
403
404
405
					).await?;

					dispatch_collation_events_to_all(events, &mut ctx).await;
				}
			},
406
			Action::SendMessage(msg) => ctx.send_message(msg).await,
407
408
409
410
		}
	}
}

411
fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_number: BlockNumber) -> View {
412
413
	View::new(
		live_heads.rev().take(MAX_VIEW_HEADS),
414
		finalized_number
415
	)
416
417
}

418
#[tracing::instrument(level = "trace", skip(net, ctx, validation_peers, collation_peers), fields(subsystem = LOG_TARGET))]
419
async fn update_our_view(
420
	net: &mut impl Network,
421
	ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
422
	live_heads: &[(Hash, Arc<jaeger::Span>)],
423
	local_view: &mut View,
424
	finalized_number: BlockNumber,
425
426
427
	validation_peers: &HashMap<PeerId, PeerData>,
	collation_peers: &HashMap<PeerId, PeerData>,
) -> SubsystemResult<()> {
428
	let new_view = construct_view(live_heads.iter().map(|v| v.0), finalized_number);
429

430
431
432
	// We only want to send a view update when the heads changed.
	// A change in finalized block number only is _not_ sufficient.
	if local_view.check_heads_eq(&new_view) {
433
434
		return Ok(())
	}
435

436
437
	*local_view = new_view.clone();

438
439
440
441
442
443
444
445
446
	send_validation_message(
		net,
		validation_peers.keys().cloned(),
		WireMessage::ViewUpdate(new_view.clone()),
	).await?;

	send_collation_message(
		net,
		collation_peers.keys().cloned(),
447
		WireMessage::ViewUpdate(new_view),
448
449
	).await?;

450
451
452
	let our_view = OurView::new(live_heads.iter().cloned(), finalized_number);

	dispatch_validation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view.clone()), ctx).await;
453

454
	dispatch_collation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view), ctx).await;
455
456
457
458
459
460

	Ok(())
}

// Handle messages on a specific peer-set. The peer is expected to be connected on that
// peer-set.
461
#[tracing::instrument(level = "trace", skip(peers, messages, net), fields(subsystem = LOG_TARGET))]
462
463
464
465
466
467
468
469
470
async fn handle_peer_messages<M>(
	peer: PeerId,
	peers: &mut HashMap<PeerId, PeerData>,
	messages: Vec<WireMessage<M>>,
	net: &mut impl Network,
) -> SubsystemResult<Vec<NetworkBridgeEvent<M>>> {
	let peer_data = match peers.get_mut(&peer) {
		None => {
			net.report_peer(peer, UNCONNECTED_PEERSET_COST).await?;
471

472
473
474
475
476
477
478
479
480
			return Ok(Vec::new());
		},
		Some(d) => d,
	};

	let mut outgoing_messages = Vec::with_capacity(messages.len());
	for message in messages {
		outgoing_messages.push(match message {
			WireMessage::ViewUpdate(new_view) => {
481
				if new_view.len() > MAX_VIEW_HEADS ||
482
483
					new_view.finalized_number < peer_data.view.finalized_number
				{
484
485
486
487
488
					net.report_peer(
						peer.clone(),
						MALFORMED_VIEW_COST,
					).await?;

489
					continue
490
				} else if new_view.is_empty() {
491
492
493
494
495
					net.report_peer(
						peer.clone(),
						EMPTY_VIEW_COST,
					).await?;

496
497
498
499
500
					continue
				} else if new_view == peer_data.view {
					continue
				} else {
					peer_data.view = new_view;
501

502
503
504
505
506
507
508
509
510
511
512
					NetworkBridgeEvent::PeerViewChange(
						peer.clone(),
						peer_data.view.clone(),
					)
				}
			}
			WireMessage::ProtocolMessage(message) => {
				NetworkBridgeEvent::PeerMessage(peer.clone(), message)
			}
		})
	}
513

514
515
516
	Ok(outgoing_messages)
}

517
#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))]
518
519
520
521
522
523
524
525
526
527
528
529
async fn send_validation_message<I>(
	net: &mut impl Network,
	peers: I,
	message: WireMessage<protocol_v1::ValidationProtocol>,
) -> SubsystemResult<()>
	where
		I: IntoIterator<Item=PeerId>,
		I::IntoIter: ExactSizeIterator,
{
	send_message(net, peers, PeerSet::Validation, message).await
}

530
#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))]
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
async fn send_collation_message<I>(
	net: &mut impl Network,
	peers: I,
	message: WireMessage<protocol_v1::CollationProtocol>,
) -> SubsystemResult<()>
	where
	I: IntoIterator<Item=PeerId>,
	I::IntoIter: ExactSizeIterator,
{
	send_message(net, peers, PeerSet::Collation, message).await
}


async fn dispatch_validation_event_to_all(
	event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
	ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
547
) {
548
549
550
551
552
553
	dispatch_validation_events_to_all(std::iter::once(event), ctx).await
}

async fn dispatch_collation_event_to_all(
	event: NetworkBridgeEvent<protocol_v1::CollationProtocol>,
	ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
554
) {
555
556
557
	dispatch_collation_events_to_all(std::iter::once(event), ctx).await
}

558
#[tracing::instrument(level = "trace", skip(events, ctx), fields(subsystem = LOG_TARGET))]
559
560
561
async fn dispatch_validation_events_to_all<I>(
	events: I,
	ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
562
)
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
	where
		I: IntoIterator<Item = NetworkBridgeEvent<protocol_v1::ValidationProtocol>>,
		I::IntoIter: Send,
{
	let messages_for = |event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>| {
		let a = std::iter::once(event.focus().ok().map(|m| AllMessages::AvailabilityDistribution(
			AvailabilityDistributionMessage::NetworkBridgeUpdateV1(m)
		)));

		let b = std::iter::once(event.focus().ok().map(|m| AllMessages::BitfieldDistribution(
			BitfieldDistributionMessage::NetworkBridgeUpdateV1(m)
		)));

		let p = std::iter::once(event.focus().ok().map(|m| AllMessages::PoVDistribution(
			PoVDistributionMessage::NetworkBridgeUpdateV1(m)
		)));

		let s = std::iter::once(event.focus().ok().map(|m| AllMessages::StatementDistribution(
			StatementDistributionMessage::NetworkBridgeUpdateV1(m)
		)));

584
585
586
587
588
		let ap = std::iter::once(event.focus().ok().map(|m| AllMessages::ApprovalDistribution(
			ApprovalDistributionMessage::NetworkBridgeUpdateV1(m)
		)));

		a.chain(b).chain(p).chain(s).chain(ap).filter_map(|x| x)
589
590
591
592
593
	};

	ctx.send_messages(events.into_iter().flat_map(messages_for)).await
}

594
#[tracing::instrument(level = "trace", skip(events, ctx), fields(subsystem = LOG_TARGET))]
595
596
597
async fn dispatch_collation_events_to_all<I>(
	events: I,
	ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
598
)
599
600
601
602
603
604
605
606
607
608
609
	where
		I: IntoIterator<Item = NetworkBridgeEvent<protocol_v1::CollationProtocol>>,
		I::IntoIter: Send,
{
	let messages_for = |event: NetworkBridgeEvent<protocol_v1::CollationProtocol>| {
		event.focus().ok().map(|m| AllMessages::CollatorProtocol(
			CollatorProtocolMessage::NetworkBridgeUpdateV1(m)
		))
	};

	ctx.send_messages(events.into_iter().flat_map(messages_for)).await
610
611
}

612

613

614

615
616
617
#[cfg(test)]
mod tests {
	use super::*;
618
	use futures::executor;
619
620
621
	use futures::stream::BoxStream;
	use std::pin::Pin;
	use std::sync::Arc;
622

623
	use std::borrow::Cow;
624
625
	use std::collections::HashSet;
	use async_trait::async_trait;
626
627
628
	use parking_lot::Mutex;
	use assert_matches::assert_matches;

629
630
631
	use sc_network::Event as NetworkEvent;

	use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
632
633
634
635
	use polkadot_subsystem::messages::{
		StatementDistributionMessage, BitfieldDistributionMessage,
		ApprovalDistributionMessage,
	};
636
637
638
	use polkadot_node_subsystem_test_helpers::{
		SingleItemSink, SingleItemStream, TestSubsystemContextHandle,
	};
639
	use polkadot_node_subsystem_util::metered;
640
	use polkadot_node_network_protocol::view;
641
	use sc_network::Multiaddr;
642
	use sc_network::config::RequestResponseConfig;
643
	use sp_keyring::Sr25519Keyring;
644
	use polkadot_primitives::v1::AuthorityDiscoveryId;
645
	use polkadot_node_network_protocol::{ObservedRole, request_response::request::Requests};
646
647

	use crate::network::{Network, NetworkAction};
648
649
650
651

	// The subsystem's view of the network - only supports a single call to `event_stream`.
	struct TestNetwork {
		net_events: Arc<Mutex<Option<SingleItemStream<NetworkEvent>>>>,
652
		action_tx: metered::UnboundedMeteredSender<NetworkAction>,
653
		_req_configs: Vec<RequestResponseConfig>,
654
655
	}

656
657
	struct TestAuthorityDiscovery;

658
659
660
	// The test's view of the network. This receives updates from the subsystem in the form
	// of `NetworkAction`s.
	struct TestNetworkHandle {
661
		action_rx: metered::UnboundedMeteredReceiver<NetworkAction>,
662
663
664
		net_tx: SingleItemSink<NetworkEvent>,
	}

665
	fn new_test_network(req_configs: Vec<RequestResponseConfig>) -> (
666
667
		TestNetwork,
		TestNetworkHandle,
668
		TestAuthorityDiscovery,
669
	) {
670
		let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink();
671
		let (action_tx, action_rx) = metered::unbounded("test_action");
672
673
674
675
676

		(
			TestNetwork {
				net_events: Arc::new(Mutex::new(Some(net_rx))),
				action_tx,
677
				_req_configs: req_configs,
678
679
680
681
682
			},
			TestNetworkHandle {
				action_rx,
				net_tx,
			},
683
			TestAuthorityDiscovery,
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
		)
	}

	impl Network for TestNetwork {
		fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
			self.net_events.lock()
				.take()
				.expect("Subsystem made more than one call to `event_stream`")
				.boxed()
		}

		fn action_sink<'a>(&'a mut self)
			-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
		{
			Box::pin((&mut self.action_tx).sink_map_err(Into::into))
		}
700
701
702

		fn start_request(&self, _: Requests) {
		}
703
704
	}

705
	#[async_trait]
706
	impl validator_discovery::Network for TestNetwork {
707
		async fn add_peers_to_reserved_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> {
708
709
710
			Ok(())
		}

711
		async fn remove_peers_from_reserved_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> {
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
			Ok(())
		}
	}

	#[async_trait]
	impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery {
		async fn get_addresses_by_authority_id(&mut self, _authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> {
			None
		}

		async fn get_authority_id_by_peer_id(&mut self, _peer_id: PeerId) -> Option<AuthorityDiscoveryId> {
			None
		}
	}

727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
	impl TestNetworkHandle {
		// Get the next network action.
		async fn next_network_action(&mut self) -> NetworkAction {
			self.action_rx.next().await.expect("subsystem concluded early")
		}

		// Wait for the next N network actions.
		async fn next_network_actions(&mut self, n: usize) -> Vec<NetworkAction> {
			let mut v = Vec::with_capacity(n);
			for _ in 0..n {
				v.push(self.next_network_action().await);
			}

			v
		}

743
		async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) {
744
745
			self.send_network_event(NetworkEvent::NotificationStreamOpened {
				remote: peer,
746
				protocol: peer_set.into_protocol_name(),
747
				role: role.into(),
748
749
750
			}).await;
		}

751
		async fn disconnect_peer(&mut self, peer: PeerId, peer_set: PeerSet) {
752
753
			self.send_network_event(NetworkEvent::NotificationStreamClosed {
				remote: peer,
754
				protocol: peer_set.into_protocol_name(),
755
756
757
			}).await;
		}

758
		async fn peer_message(&mut self, peer: PeerId, peer_set: PeerSet, message: Vec<u8>) {
759
760
			self.send_network_event(NetworkEvent::NotificationsReceived {
				remote: peer,
761
				messages: vec![(peer_set.into_protocol_name(), message.into())],
762
763
764
765
766
767
768
769
			}).await;
		}

		async fn send_network_event(&mut self, event: NetworkEvent) {
			self.net_tx.send(event).await.expect("subsystem concluded early");
		}
	}

770
771
772
773
774
	/// Assert that the given actions contain the given `action`.
	fn assert_network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) {
		if !actions.iter().any(|x| x == action) {
			panic!("Could not find `{:?}` in `{:?}`", action, actions);
		}
775
776
777
778
	}

	struct TestHarness {
		network_handle: TestNetworkHandle,
779
		virtual_overseer: TestSubsystemContextHandle<NetworkBridgeMessage>,
780
781
782
	}

	fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
Bastian Köcher's avatar
Bastian Köcher committed
783
		let pool = sp_core::testing::TaskExecutor::new();
784
785
		let (request_multiplexer, req_configs) = RequestMultiplexer::new();
		let (network, network_handle, discovery) = new_test_network(req_configs);
786
		let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
787

788
789
790
791
792
793
		let bridge = NetworkBridge {
			network_service: network,
			authority_discovery_service: discovery,
			request_multiplexer,
		};

794
		let network_bridge = run_network(
795
			bridge,
796
797
798
799
800
801
802
803
804
805
806
807
808
			context,
		)
			.map_err(|_| panic!("subsystem execution failed"))
			.map(|_| ());

		let test_fut = test(TestHarness {
			network_handle,
			virtual_overseer,
		});

		futures::pin_mut!(test_fut);
		futures::pin_mut!(network_bridge);

809
		let _ = executor::block_on(future::select(test_fut, network_bridge));
810
811
	}

812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
	async fn assert_sends_validation_event_to_all(
		event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
		virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeMessage>,
	) {
		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::AvailabilityDistribution(
				AvailabilityDistributionMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		);

		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::BitfieldDistribution(
				BitfieldDistributionMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		);

		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::PoVDistribution(
				PoVDistributionMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		);

		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::StatementDistribution(
				StatementDistributionMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		);
843
844
845
846
847
848
849

		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::ApprovalDistribution(
				ApprovalDistributionMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		);
850
851
852
853
854
855
856
857
858
859
860
861
862
863
	}

	async fn assert_sends_collation_event_to_all(
		event: NetworkBridgeEvent<protocol_v1::CollationProtocol>,
		virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeMessage>,
	) {
		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::CollatorProtocol(
				CollatorProtocolMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		)
	}

864
865
866
867
868
869
870
871
872
873
874
875
876
	#[test]
	fn send_our_view_upon_connection() {
		test_harness(|test_harness| async move {
			let TestHarness {
				mut network_handle,
				mut virtual_overseer,
			} = test_harness;

			let peer = PeerId::random();

			let head = Hash::repeat_byte(1);
			virtual_overseer.send(
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(
877
					ActiveLeavesUpdate::start_work(head, Arc::new(jaeger::Span::Disabled)),
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
				))
			).await;

			network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
			network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;

			let view = view![head];
			let actions = network_handle.next_network_actions(2).await;
			assert_network_actions_contains(
				&actions,
				&NetworkAction::WriteNotification(
					peer.clone(),
					PeerSet::Validation,
					WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
						view.clone(),
					).encode(),
				),
			);
			assert_network_actions_contains(
				&actions,
				&NetworkAction::WriteNotification(
					peer.clone(),
					PeerSet::Collation,
					WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(
						view.clone(),
					).encode(),
				),
			);
		});
	}

909
910
911
912
913
914
915
916
	#[test]
	fn sends_view_updates_to_peers() {
		test_harness(|test_harness| async move {
			let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;

			let peer_a = PeerId::random();
			let peer_b = PeerId::random();

917
918
919
920
921
922
923
924
925
926
			network_handle.connect_peer(
				peer_a.clone(),
				PeerSet::Validation,
				ObservedRole::Full,
			).await;
			network_handle.connect_peer(
				peer_b.clone(),
				PeerSet::Validation,
				ObservedRole::Full,
			).await;
927

928
			let hash_a = Hash::repeat_byte(1);
929

930
			virtual_overseer.send(
931
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(
932
					ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)),
933
				))
934
			).await;
935

936
			let actions = network_handle.next_network_actions(4).await;
937
			let wire_message = WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
938
				view![hash_a]
939
940
			).encode();

941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
			assert_network_actions_contains(
				&actions,
				&NetworkAction::WriteNotification(
					peer_a,
					PeerSet::Validation,
					wire_message.clone(),
				),
			);

			assert_network_actions_contains(
				&actions,
				&NetworkAction::WriteNotification(
					peer_b,
					PeerSet::Validation,
					wire_message.clone(),
				),
			);
		});
	}

	#[test]
	fn do_not_send_view_update_when_only_finalized_block_changed() {
		test_harness(|test_harness| async move {
			let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;

			let peer_a = PeerId::random();
			let peer_b = PeerId::random();

			network_handle.connect_peer(
				peer_a.clone(),
				PeerSet::Validation,
				ObservedRole::Full,
			).await;
			network_handle.connect_peer(
				peer_b.clone(),
				PeerSet::Validation,
				ObservedRole::Full,
			).await;

			let hash_a = Hash::repeat_byte(1);

			virtual_overseer.send(FromOverseer::Signal(OverseerSignal::BlockFinalized(Hash::random(), 5))).await;

			// Send some empty active leaves update
			//
			// This should not trigger a view update to our peers.
			virtual_overseer.send(
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::default()))
			).await;

			// This should trigger the view update to our peers.
			virtual_overseer.send(
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(
994
					ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)),
995
996
997
				))
			).await;

998
			let actions = network_handle.next_network_actions(4).await;
999
			let wire_message = WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
1000
				View::new(vec![hash_a], 5)
For faster browsing, not all history is shown. View entire blame