lib.rs 38.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The Network Bridge Subsystem - protocol multiplexer for Polkadot.

19
#![deny(unused_crate_dependencies)]
20
21
22
#![warn(missing_docs)]


23
24
25
26
use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;

use polkadot_subsystem::{
27
	ActiveLeavesUpdate, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
28
	SubsystemResult, JaegerSpan,
29
};
30
31
32
use polkadot_subsystem::messages::{
	NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage,
	BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
33
	CollatorProtocolMessage, ApprovalDistributionMessage,
34
};
35
use polkadot_primitives::v1::{Hash, BlockNumber};
36
use polkadot_node_network_protocol::{
37
	ReputationChange, PeerId, peer_set::PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView,
38
};
39

40
41
42
43
44
/// Peer set infos for network initialization.
///
/// To be added to [`NetworkConfiguration::extra_sets`].
pub use polkadot_node_network_protocol::peer_set::peer_sets_info;

45
use std::collections::{HashMap, hash_map};
46
use std::iter::ExactSizeIterator;
47
48
use std::sync::Arc;

49
50
mod validator_discovery;

51
52
53
54
55
56
57
58
59
60
61
62
63
64
/// Internally used `Action` type.
///
/// All requested `NetworkBridgeMessage` user actions  and `NetworkEvent` network messages are
/// translated to `Action` before being processed by `run_network`.
mod action;
use action::Action;

/// Actual interfacing to the network based on the `Network` trait.
///
/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`.
mod network;
use network::{Network, send_message};


65
66
67
68
69
70
/// The maximum amount of heads a peer is allowed to have in their view at any time.
///
/// We use the same limit to compute the view sent to peers locally.
const MAX_VIEW_HEADS: usize = 5;


71
72
73
74
const MALFORMED_MESSAGE_COST: ReputationChange = ReputationChange::new(-500, "Malformed Network-bridge message");
const UNCONNECTED_PEERSET_COST: ReputationChange = ReputationChange::new(-50, "Message sent to un-connected peer-set");
const MALFORMED_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Malformed view");
const EMPTY_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Peer sent us an empty view");
75

76
// network bridge log target
77
const LOG_TARGET: &'static str = "network_bridge";
78

79
80
81
/// Messages from and to the network.
///
/// As transmitted to and received from subsystems.
82
#[derive(Debug, Encode, Decode, Clone)]
83
pub enum WireMessage<M> {
84
	/// A message from a peer on a specific protocol.
85
	#[codec(index = 1)]
86
	ProtocolMessage(M),
87
	/// A view update from a peer.
88
	#[codec(index = 2)]
89
90
91
92
93
	ViewUpdate(View),
}


/// The network bridge subsystem.
94
pub struct NetworkBridge<N, AD> {
95
	/// `Network` trait implementing type.
96
97
98
	network_service: N,
	authority_discovery_service: AD,
}
99

100
101
impl<N, AD> NetworkBridge<N, AD> {
	/// Create a new network bridge subsystem with underlying network service and authority discovery service.
102
103
	///
	/// This assumes that the network service has had the notifications protocol for the network
104
	/// bridge already registered. See [`peers_sets_info`](peers_sets_info).
105
106
107
108
109
	pub fn new(network_service: N, authority_discovery_service: AD) -> Self {
		NetworkBridge {
			network_service,
			authority_discovery_service,
		}
110
111
112
	}
}

113
impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
114
	where
115
116
		Net: Network + validator_discovery::Network,
		AD: validator_discovery::AuthorityDiscovery,
117
118
		Context: SubsystemContext<Message=NetworkBridgeMessage>,
{
119
120
121
	fn start(self, ctx: Context) -> SpawnedSubsystem {
		// Swallow error because failure is fatal to the node and we log with more precision
		// within `run_network`.
122
		let Self { network_service, authority_discovery_service } = self;
123
		let future = run_network(
124
125
126
				network_service,
				authority_discovery_service,
				ctx,
127
128
129
130
131
132
133
134
			)
			.map_err(|e| {
				SubsystemError::with_origin("network-bridge", e)
			})
			.boxed();
		SpawnedSubsystem {
			name: "network-bridge-subsystem",
			future,
135
		}
136
137
138
139
140
141
142
143
	}
}

struct PeerData {
	/// Latest view sent by the peer.
	view: View,
}

144
145
146
147
148
149
150
151
152
153
154
155
/// Main driver, processing network events and messages from other subsystems.
#[tracing::instrument(skip(network_service, authority_discovery_service, ctx), fields(subsystem = LOG_TARGET))]
async fn run_network<N, AD>(
	mut network_service: N,
	mut authority_discovery_service: AD,
	mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()>
where
	N: Network + validator_discovery::Network,
	AD: validator_discovery::AuthorityDiscovery,
{
	let mut event_stream = network_service.event_stream().fuse();
156

157
158
159
160
	// Most recent heads are at the back.
	let mut live_heads: Vec<(Hash, Arc<JaegerSpan>)> = Vec::with_capacity(MAX_VIEW_HEADS);
	let mut local_view = View::default();
	let mut finalized_number = 0;
161

162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
	let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new();
	let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new();

	let mut validator_discovery = validator_discovery::Service::<N, AD>::new();

	loop {

		let action = {
			let subsystem_next = ctx.recv().fuse();
			let mut net_event_next = event_stream.next().fuse();
			futures::pin_mut!(subsystem_next);

			futures::select! {
				subsystem_msg = subsystem_next => Action::from(subsystem_msg),
				net_event = net_event_next => Action::from(net_event),
177
			}
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
		};

		match action {
			Action::Nop => {}
			Action::Abort => return Ok(()),

			Action::SendValidationMessages(msgs) => {
				for (peers, msg) in msgs {
					send_message(
							&mut network_service,
							peers,
							PeerSet::Validation,
							WireMessage::ProtocolMessage(msg),
					).await?
				}
193
			}
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234

			Action::SendCollationMessages(msgs) => {
				for (peers, msg) in msgs {
					send_message(
							&mut network_service,
							peers,
							PeerSet::Collation,
							WireMessage::ProtocolMessage(msg),
					).await?
				}
			}

			Action::ConnectToValidators {
				validator_ids,
				connected,
			} => {
				let (ns, ads) = validator_discovery.on_request(
					validator_ids,
					connected,
					network_service,
					authority_discovery_service,
				).await;
				network_service = ns;
				authority_discovery_service = ads;
			},

			Action::ReportPeer(peer, rep) => network_service.report_peer(peer, rep).await?,

			Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
				live_heads.extend(activated);
				live_heads.retain(|h| !deactivated.contains(&h.0));

				update_our_view(
					&mut network_service,
					&mut ctx,
					&live_heads,
					&mut local_view,
					finalized_number,
					&validation_peers,
					&collation_peers,
				).await?;
235
			}
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330

			Action::BlockFinalized(number) => {
				debug_assert!(finalized_number < number);

				// we don't send the view updates here, but delay them until the next `Action::ActiveLeaves`
				// otherwise it might break assumptions of some of the subsystems
				// that we never send the same `ActiveLeavesUpdate`
				// this is fine, we will get `Action::ActiveLeaves` on block finalization anyway
				finalized_number = number;
			},

			Action::PeerConnected(peer_set, peer, role) => {
				let peer_map = match peer_set {
					PeerSet::Validation => &mut validation_peers,
					PeerSet::Collation => &mut collation_peers,
				};

				validator_discovery.on_peer_connected(&peer, &mut authority_discovery_service).await;

				match peer_map.entry(peer.clone()) {
					hash_map::Entry::Occupied(_) => continue,
					hash_map::Entry::Vacant(vacant) => {
						let _ = vacant.insert(PeerData {
							view: View::default(),
						});

						match peer_set {
							PeerSet::Validation => dispatch_validation_events_to_all(
								vec![
									NetworkBridgeEvent::PeerConnected(peer.clone(), role),
									NetworkBridgeEvent::PeerViewChange(
										peer,
										View::default(),
									),
								],
								&mut ctx,
							).await,
							PeerSet::Collation => dispatch_collation_events_to_all(
								vec![
									NetworkBridgeEvent::PeerConnected(peer.clone(), role),
									NetworkBridgeEvent::PeerViewChange(
										peer,
										View::default(),
									),
								],
								&mut ctx,
							).await,
						}
					}
				}
			}
			Action::PeerDisconnected(peer_set, peer) => {
				let peer_map = match peer_set {
					PeerSet::Validation => &mut validation_peers,
					PeerSet::Collation => &mut collation_peers,
				};

				validator_discovery.on_peer_disconnected(&peer);

				if peer_map.remove(&peer).is_some() {
					match peer_set {
						PeerSet::Validation => dispatch_validation_event_to_all(
							NetworkBridgeEvent::PeerDisconnected(peer),
							&mut ctx,
						).await,
						PeerSet::Collation => dispatch_collation_event_to_all(
							NetworkBridgeEvent::PeerDisconnected(peer),
							&mut ctx,
						).await,
					}
				}
			},
			Action::PeerMessages(peer, v_messages, c_messages) => {
				if !v_messages.is_empty() {
					let events = handle_peer_messages(
						peer.clone(),
						&mut validation_peers,
						v_messages,
						&mut network_service,
					).await?;

					dispatch_validation_events_to_all(events, &mut ctx).await;
				}

				if !c_messages.is_empty() {
					let events = handle_peer_messages(
						peer.clone(),
						&mut collation_peers,
						c_messages,
						&mut network_service,
					).await?;

					dispatch_collation_events_to_all(events, &mut ctx).await;
				}
			},
331
332
333
334
		}
	}
}

335
fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_number: BlockNumber) -> View {
336
	View {
337
		heads: live_heads.rev().take(MAX_VIEW_HEADS).collect(),
338
339
		finalized_number
	}
340
341
}

342
#[tracing::instrument(level = "trace", skip(net, ctx, validation_peers, collation_peers), fields(subsystem = LOG_TARGET))]
343
async fn update_our_view(
344
	net: &mut impl Network,
345
	ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
346
	live_heads: &[(Hash, Arc<JaegerSpan>)],
347
	local_view: &mut View,
348
	finalized_number: BlockNumber,
349
350
351
	validation_peers: &HashMap<PeerId, PeerData>,
	collation_peers: &HashMap<PeerId, PeerData>,
) -> SubsystemResult<()> {
352
	let new_view = construct_view(live_heads.iter().map(|v| v.0), finalized_number);
353
354
355
356
357

	// We only want to send a view update when the heads changed, not when only the finalized block changed.
	if local_view.heads == new_view.heads {
		return Ok(())
	}
358

359
360
	*local_view = new_view.clone();

361
362
363
364
365
366
367
368
369
	send_validation_message(
		net,
		validation_peers.keys().cloned(),
		WireMessage::ViewUpdate(new_view.clone()),
	).await?;

	send_collation_message(
		net,
		collation_peers.keys().cloned(),
370
		WireMessage::ViewUpdate(new_view),
371
372
	).await?;

373
374
375
	let our_view = OurView::new(live_heads.iter().cloned(), finalized_number);

	dispatch_validation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view.clone()), ctx).await;
376

377
	dispatch_collation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view), ctx).await;
378
379
380
381
382
383

	Ok(())
}

// Handle messages on a specific peer-set. The peer is expected to be connected on that
// peer-set.
384
#[tracing::instrument(level = "trace", skip(peers, messages, net), fields(subsystem = LOG_TARGET))]
385
386
387
388
389
390
391
392
393
async fn handle_peer_messages<M>(
	peer: PeerId,
	peers: &mut HashMap<PeerId, PeerData>,
	messages: Vec<WireMessage<M>>,
	net: &mut impl Network,
) -> SubsystemResult<Vec<NetworkBridgeEvent<M>>> {
	let peer_data = match peers.get_mut(&peer) {
		None => {
			net.report_peer(peer, UNCONNECTED_PEERSET_COST).await?;
394

395
396
397
398
399
400
401
402
403
			return Ok(Vec::new());
		},
		Some(d) => d,
	};

	let mut outgoing_messages = Vec::with_capacity(messages.len());
	for message in messages {
		outgoing_messages.push(match message {
			WireMessage::ViewUpdate(new_view) => {
404
405
406
				if new_view.heads.len() > MAX_VIEW_HEADS ||
					new_view.finalized_number < peer_data.view.finalized_number
				{
407
408
409
410
411
					net.report_peer(
						peer.clone(),
						MALFORMED_VIEW_COST,
					).await?;

412
413
414
415
416
417
418
					continue
				} else if new_view.heads.is_empty() {
					net.report_peer(
						peer.clone(),
						EMPTY_VIEW_COST,
					).await?;

419
420
421
422
423
					continue
				} else if new_view == peer_data.view {
					continue
				} else {
					peer_data.view = new_view;
424

425
426
427
428
429
430
431
432
433
434
435
					NetworkBridgeEvent::PeerViewChange(
						peer.clone(),
						peer_data.view.clone(),
					)
				}
			}
			WireMessage::ProtocolMessage(message) => {
				NetworkBridgeEvent::PeerMessage(peer.clone(), message)
			}
		})
	}
436

437
438
439
	Ok(outgoing_messages)
}

440
#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))]
441
442
443
444
445
446
447
448
449
450
451
452
async fn send_validation_message<I>(
	net: &mut impl Network,
	peers: I,
	message: WireMessage<protocol_v1::ValidationProtocol>,
) -> SubsystemResult<()>
	where
		I: IntoIterator<Item=PeerId>,
		I::IntoIter: ExactSizeIterator,
{
	send_message(net, peers, PeerSet::Validation, message).await
}

453
#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))]
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
async fn send_collation_message<I>(
	net: &mut impl Network,
	peers: I,
	message: WireMessage<protocol_v1::CollationProtocol>,
) -> SubsystemResult<()>
	where
	I: IntoIterator<Item=PeerId>,
	I::IntoIter: ExactSizeIterator,
{
	send_message(net, peers, PeerSet::Collation, message).await
}


async fn dispatch_validation_event_to_all(
	event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
	ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
470
) {
471
472
473
474
475
476
	dispatch_validation_events_to_all(std::iter::once(event), ctx).await
}

async fn dispatch_collation_event_to_all(
	event: NetworkBridgeEvent<protocol_v1::CollationProtocol>,
	ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
477
) {
478
479
480
	dispatch_collation_events_to_all(std::iter::once(event), ctx).await
}

481
#[tracing::instrument(level = "trace", skip(events, ctx), fields(subsystem = LOG_TARGET))]
482
483
484
async fn dispatch_validation_events_to_all<I>(
	events: I,
	ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
485
)
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
	where
		I: IntoIterator<Item = NetworkBridgeEvent<protocol_v1::ValidationProtocol>>,
		I::IntoIter: Send,
{
	let messages_for = |event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>| {
		let a = std::iter::once(event.focus().ok().map(|m| AllMessages::AvailabilityDistribution(
			AvailabilityDistributionMessage::NetworkBridgeUpdateV1(m)
		)));

		let b = std::iter::once(event.focus().ok().map(|m| AllMessages::BitfieldDistribution(
			BitfieldDistributionMessage::NetworkBridgeUpdateV1(m)
		)));

		let p = std::iter::once(event.focus().ok().map(|m| AllMessages::PoVDistribution(
			PoVDistributionMessage::NetworkBridgeUpdateV1(m)
		)));

		let s = std::iter::once(event.focus().ok().map(|m| AllMessages::StatementDistribution(
			StatementDistributionMessage::NetworkBridgeUpdateV1(m)
		)));

507
508
509
510
511
		let ap = std::iter::once(event.focus().ok().map(|m| AllMessages::ApprovalDistribution(
			ApprovalDistributionMessage::NetworkBridgeUpdateV1(m)
		)));

		a.chain(b).chain(p).chain(s).chain(ap).filter_map(|x| x)
512
513
514
515
516
	};

	ctx.send_messages(events.into_iter().flat_map(messages_for)).await
}

517
#[tracing::instrument(level = "trace", skip(events, ctx), fields(subsystem = LOG_TARGET))]
518
519
520
async fn dispatch_collation_events_to_all<I>(
	events: I,
	ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
521
)
522
523
524
525
526
527
528
529
530
531
532
	where
		I: IntoIterator<Item = NetworkBridgeEvent<protocol_v1::CollationProtocol>>,
		I::IntoIter: Send,
{
	let messages_for = |event: NetworkBridgeEvent<protocol_v1::CollationProtocol>| {
		event.focus().ok().map(|m| AllMessages::CollatorProtocol(
			CollatorProtocolMessage::NetworkBridgeUpdateV1(m)
		))
	};

	ctx.send_messages(events.into_iter().flat_map(messages_for)).await
533
534
}

535

536

537

538
539
540
#[cfg(test)]
mod tests {
	use super::*;
541
	use futures::executor;
542
543
544
	use futures::stream::BoxStream;
	use std::pin::Pin;
	use std::sync::Arc;
545

546
	use std::borrow::Cow;
547
548
	use std::collections::HashSet;
	use async_trait::async_trait;
549
550
551
	use parking_lot::Mutex;
	use assert_matches::assert_matches;

552
553
554
	use sc_network::Event as NetworkEvent;

	use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
555
556
557
558
	use polkadot_subsystem::messages::{
		StatementDistributionMessage, BitfieldDistributionMessage,
		ApprovalDistributionMessage,
	};
559
560
561
	use polkadot_node_subsystem_test_helpers::{
		SingleItemSink, SingleItemStream, TestSubsystemContextHandle,
	};
562
	use polkadot_node_subsystem_util::metered;
563
	use polkadot_node_network_protocol::view;
564
	use sc_network::Multiaddr;
565
	use sp_keyring::Sr25519Keyring;
566
567
568
569
	use polkadot_primitives::v1::AuthorityDiscoveryId;
	use polkadot_node_network_protocol::ObservedRole;

	use crate::network::{Network, NetworkAction};
570
571
572
573

	// The subsystem's view of the network - only supports a single call to `event_stream`.
	struct TestNetwork {
		net_events: Arc<Mutex<Option<SingleItemStream<NetworkEvent>>>>,
574
		action_tx: metered::UnboundedMeteredSender<NetworkAction>,
575
576
	}

577
578
	struct TestAuthorityDiscovery;

579
580
581
	// The test's view of the network. This receives updates from the subsystem in the form
	// of `NetworkAction`s.
	struct TestNetworkHandle {
582
		action_rx: metered::UnboundedMeteredReceiver<NetworkAction>,
583
584
585
586
587
588
		net_tx: SingleItemSink<NetworkEvent>,
	}

	fn new_test_network() -> (
		TestNetwork,
		TestNetworkHandle,
589
		TestAuthorityDiscovery,
590
	) {
591
		let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink();
592
		let (action_tx, action_rx) = metered::unbounded("test_action");
593
594
595
596
597
598
599
600
601
602

		(
			TestNetwork {
				net_events: Arc::new(Mutex::new(Some(net_rx))),
				action_tx,
			},
			TestNetworkHandle {
				action_rx,
				net_tx,
			},
603
			TestAuthorityDiscovery,
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
		)
	}

	impl Network for TestNetwork {
		fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
			self.net_events.lock()
				.take()
				.expect("Subsystem made more than one call to `event_stream`")
				.boxed()
		}

		fn action_sink<'a>(&'a mut self)
			-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
		{
			Box::pin((&mut self.action_tx).sink_map_err(Into::into))
		}
	}

622
	#[async_trait]
623
	impl validator_discovery::Network for TestNetwork {
624
		async fn add_peers_to_reserved_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> {
625
626
627
			Ok(())
		}

628
		async fn remove_peers_from_reserved_set(&mut self, _protocol: Cow<'static, str>, _: HashSet<Multiaddr>) -> Result<(), String> {
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
			Ok(())
		}
	}

	#[async_trait]
	impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery {
		async fn get_addresses_by_authority_id(&mut self, _authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> {
			None
		}

		async fn get_authority_id_by_peer_id(&mut self, _peer_id: PeerId) -> Option<AuthorityDiscoveryId> {
			None
		}
	}

644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
	impl TestNetworkHandle {
		// Get the next network action.
		async fn next_network_action(&mut self) -> NetworkAction {
			self.action_rx.next().await.expect("subsystem concluded early")
		}

		// Wait for the next N network actions.
		async fn next_network_actions(&mut self, n: usize) -> Vec<NetworkAction> {
			let mut v = Vec::with_capacity(n);
			for _ in 0..n {
				v.push(self.next_network_action().await);
			}

			v
		}

660
		async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) {
661
662
			self.send_network_event(NetworkEvent::NotificationStreamOpened {
				remote: peer,
663
				protocol: peer_set.into_protocol_name(),
664
				role: role.into(),
665
666
667
			}).await;
		}

668
		async fn disconnect_peer(&mut self, peer: PeerId, peer_set: PeerSet) {
669
670
			self.send_network_event(NetworkEvent::NotificationStreamClosed {
				remote: peer,
671
				protocol: peer_set.into_protocol_name(),
672
673
674
			}).await;
		}

675
		async fn peer_message(&mut self, peer: PeerId, peer_set: PeerSet, message: Vec<u8>) {
676
677
			self.send_network_event(NetworkEvent::NotificationsReceived {
				remote: peer,
678
				messages: vec![(peer_set.into_protocol_name(), message.into())],
679
680
681
682
683
684
685
686
			}).await;
		}

		async fn send_network_event(&mut self, event: NetworkEvent) {
			self.net_tx.send(event).await.expect("subsystem concluded early");
		}
	}

687
688
689
690
691
	/// Assert that the given actions contain the given `action`.
	fn assert_network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) {
		if !actions.iter().any(|x| x == action) {
			panic!("Could not find `{:?}` in `{:?}`", action, actions);
		}
692
693
694
695
	}

	struct TestHarness {
		network_handle: TestNetworkHandle,
696
		virtual_overseer: TestSubsystemContextHandle<NetworkBridgeMessage>,
697
698
699
	}

	fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
Bastian Köcher's avatar
Bastian Köcher committed
700
		let pool = sp_core::testing::TaskExecutor::new();
701
		let (network, network_handle, discovery) = new_test_network();
702
		let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
703
704
705

		let network_bridge = run_network(
			network,
706
			discovery,
707
708
709
710
711
712
713
714
715
716
717
718
719
			context,
		)
			.map_err(|_| panic!("subsystem execution failed"))
			.map(|_| ());

		let test_fut = test(TestHarness {
			network_handle,
			virtual_overseer,
		});

		futures::pin_mut!(test_fut);
		futures::pin_mut!(network_bridge);

720
		let _ = executor::block_on(future::select(test_fut, network_bridge));
721
722
	}

723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
	async fn assert_sends_validation_event_to_all(
		event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
		virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeMessage>,
	) {
		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::AvailabilityDistribution(
				AvailabilityDistributionMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		);

		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::BitfieldDistribution(
				BitfieldDistributionMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		);

		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::PoVDistribution(
				PoVDistributionMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		);

		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::StatementDistribution(
				StatementDistributionMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		);
754
755
756
757
758
759
760

		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::ApprovalDistribution(
				ApprovalDistributionMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		);
761
762
763
764
765
766
767
768
769
770
771
772
773
774
	}

	async fn assert_sends_collation_event_to_all(
		event: NetworkBridgeEvent<protocol_v1::CollationProtocol>,
		virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeMessage>,
	) {
		assert_matches!(
			virtual_overseer.recv().await,
			AllMessages::CollatorProtocol(
				CollatorProtocolMessage::NetworkBridgeUpdateV1(e)
			) if e == event.focus().expect("could not focus message")
		)
	}

775
776
777
778
779
780
781
782
	#[test]
	fn sends_view_updates_to_peers() {
		test_harness(|test_harness| async move {
			let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;

			let peer_a = PeerId::random();
			let peer_b = PeerId::random();

783
784
785
786
787
788
789
790
791
792
			network_handle.connect_peer(
				peer_a.clone(),
				PeerSet::Validation,
				ObservedRole::Full,
			).await;
			network_handle.connect_peer(
				peer_b.clone(),
				PeerSet::Validation,
				ObservedRole::Full,
			).await;
793

794
			let hash_a = Hash::repeat_byte(1);
795

796
			virtual_overseer.send(
797
798
799
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(
					ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)),
				))
800
			).await;
801
802

			let actions = network_handle.next_network_actions(2).await;
803
			let wire_message = WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
804
				view![hash_a]
805
806
			).encode();

807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
			assert_network_actions_contains(
				&actions,
				&NetworkAction::WriteNotification(
					peer_a,
					PeerSet::Validation,
					wire_message.clone(),
				),
			);

			assert_network_actions_contains(
				&actions,
				&NetworkAction::WriteNotification(
					peer_b,
					PeerSet::Validation,
					wire_message.clone(),
				),
			);
		});
	}

	#[test]
	fn do_not_send_view_update_when_only_finalized_block_changed() {
		test_harness(|test_harness| async move {
			let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;

			let peer_a = PeerId::random();
			let peer_b = PeerId::random();

			network_handle.connect_peer(
				peer_a.clone(),
				PeerSet::Validation,
				ObservedRole::Full,
			).await;
			network_handle.connect_peer(
				peer_b.clone(),
				PeerSet::Validation,
				ObservedRole::Full,
			).await;

			let hash_a = Hash::repeat_byte(1);

			virtual_overseer.send(FromOverseer::Signal(OverseerSignal::BlockFinalized(Hash::random(), 5))).await;

			// Send some empty active leaves update
			//
			// This should not trigger a view update to our peers.
			virtual_overseer.send(
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::default()))
			).await;

			// This should trigger the view update to our peers.
			virtual_overseer.send(
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(
					ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)),
				))
			).await;

			let actions = network_handle.next_network_actions(2).await;
			let wire_message = WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
				View { heads: vec![hash_a], finalized_number: 5 }
			).encode();

			assert_network_actions_contains(
870
				&actions,
871
872
873
874
875
				&NetworkAction::WriteNotification(
					peer_a,
					PeerSet::Validation,
					wire_message.clone(),
				),
876
			);
877

878
			assert_network_actions_contains(
879
				&actions,
880
881
882
883
884
				&NetworkAction::WriteNotification(
					peer_b,
					PeerSet::Validation,
					wire_message.clone(),
				),
885
			);
886
887
888
889
890
891
892
893
894
895
896
897
898
		});
	}

	#[test]
	fn peer_view_updates_sent_via_overseer() {
		test_harness(|test_harness| async move {
			let TestHarness {
				mut network_handle,
				mut virtual_overseer,
			} = test_harness;

			let peer = PeerId::random();

899
			network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
900

901
			let view = view![Hash::repeat_byte(1)];
902

903
904
905
906
907
908
909
910
			// bridge will inform about all connected peers.
			{
				assert_sends_validation_event_to_all(
					NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
					&mut virtual_overseer,
				).await;

				assert_sends_validation_event_to_all(
911
					NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()),
912
913
914
					&mut virtual_overseer,
				).await;
			}
915

916
917
918
919
920
921
922
			network_handle.peer_message(
				peer.clone(),
				PeerSet::Validation,
				WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
					view.clone(),
				).encode(),
			).await;
923

924
925
926
927
928
929
			assert_sends_validation_event_to_all(
				NetworkBridgeEvent::PeerViewChange(peer.clone(), view),
				&mut virtual_overseer,
			).await;
		});
	}
930

931
932
933
934
935
936
937
	#[test]
	fn peer_messages_sent_via_overseer() {
		test_harness(|test_harness| async move {
			let TestHarness {
				mut network_handle,
				mut virtual_overseer,
			} = test_harness;
938

939
			let peer = PeerId::random();
940

941
942
943
944
945
			network_handle.connect_peer(
				peer.clone(),
				PeerSet::Validation,
				ObservedRole::Full,
			).await;
946

947
948
949
950
951
952
953
954
			// bridge will inform about all connected peers.
			{
				assert_sends_validation_event_to_all(
					NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
					&mut virtual_overseer,
				).await;

				assert_sends_validation_event_to_all(
955
					NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()),
956
957
					&mut virtual_overseer,
				).await;
958
959
			}

960
961
962
963
964
965
966
967
968
			let pov_distribution_message = protocol_v1::PoVDistributionMessage::Awaiting(
				[0; 32].into(),
				vec![[1; 32].into()],
			);

			let message = protocol_v1::ValidationProtocol::PoVDistribution(
				pov_distribution_message.clone(),
			);

969
970
			network_handle.peer_message(
				peer.clone(),
971
972
				PeerSet::Validation,
				WireMessage::ProtocolMessage(message.clone()).encode(),
973
974
			).await;

975
			network_handle.disconnect_peer(peer.clone(), PeerSet::Validation).await;
976

977
978
			// PoV distribution message comes first, and the message is only sent to that subsystem.
			// then a disconnection event arises that is sent to all validation networking subsystems.
979
980
981

			assert_matches!(
				virtual_overseer.recv().await,
982
983
984
				AllMessages::PoVDistribution(
					PoVDistributionMessage::NetworkBridgeUpdateV1(
						NetworkBridgeEvent::PeerMessage(p, m)
985
986
987
					)
				) => {
					assert_eq!(p, peer);
988
					assert_eq!(m, pov_distribution_message);
989
990
				}
			);
991
992
993
994
995

			assert_sends_validation_event_to_all(
				NetworkBridgeEvent::PeerDisconnected(peer),
				&mut virtual_overseer,
			).await;
996
997
998
999
		});
	}

	#[test]
1000
	fn peer_disconnect_from_just_one_peerset() {
For faster browsing, not all history is shown. View entire blame