lib.rs 66.4 KB
Newer Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The Network Bridge Subsystem - protocol multiplexer for Polkadot.

#![warn(missing_docs)]


use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;
use futures::stream::BoxStream;
use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;

use polkadot_subsystem::{
	ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem,
	Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SubsystemSender,
	messages::StatementDistributionMessage
use polkadot_subsystem::messages::{
	NetworkBridgeMessage, AllMessages,
	CollatorProtocolMessage, NetworkBridgeEvent,
use polkadot_primitives::v1::{Hash, BlockNumber};
use polkadot_node_network_protocol::{
	PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep,
use polkadot_node_subsystem_util::metrics::{self, prometheus};
/// Peer set infos for network initialization.
///
/// To be added to [`NetworkConfiguration::extra_sets`].
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use std::collections::{HashMap, hash_map};
use std::iter::ExactSizeIterator;
/// Actual interfacing to the network based on the `Network` trait.
///
/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`.
mod network;
use network::{Network, send_message};

/// Request multiplexer for combining the multiple request sources into a single `Stream` of `AllMessages`.
mod multiplexer;
pub use multiplexer::RequestMultiplexer;

/// The maximum amount of heads a peer is allowed to have in their view at any time.
///
/// We use the same limit to compute the view sent to peers locally.
const MAX_VIEW_HEADS: usize = 5;


const MALFORMED_MESSAGE_COST: Rep = Rep::CostMajor("Malformed Network-bridge message");
const UNCONNECTED_PEERSET_COST: Rep = Rep::CostMinor("Message sent to un-connected peer-set");
const MALFORMED_VIEW_COST: Rep = Rep::CostMajor("Malformed view");
const EMPTY_VIEW_COST: Rep = Rep::CostMajor("Peer sent us an empty view");
const LOG_TARGET: &'static str = "parachain::network-bridge";
/// Metrics for the network bridge.
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_peer_connected(&self, peer_set: PeerSet) {
		self.0.as_ref().map(|metrics| metrics
			.connected_events
			.with_label_values(&[peer_set.get_protocol_name_static()])
			.inc()
		);
	}

	fn on_peer_disconnected(&self, peer_set: PeerSet) {
		self.0.as_ref().map(|metrics| metrics
			.disconnected_events
			.with_label_values(&[peer_set.get_protocol_name_static()])
			.inc()
		);
	}

	fn note_peer_count(&self, peer_set: PeerSet, count: usize) {
		self.0.as_ref().map(|metrics| metrics
			.peer_count
			.with_label_values(&[peer_set.get_protocol_name_static()])
			.set(count as u64)
		);
	}

	fn on_notification_received(&self, peer_set: PeerSet, size: usize) {
		if let Some(metrics) = self.0.as_ref() {
			metrics.notifications_received
				.with_label_values(&[peer_set.get_protocol_name_static()])
				.inc();

			metrics.bytes_received
				.with_label_values(&[peer_set.get_protocol_name_static()])
				.inc_by(size as u64);
		}
	}

	fn on_notification_sent(&self, peer_set: PeerSet, size: usize, to_peers: usize) {
		if let Some(metrics) = self.0.as_ref() {
			metrics.notifications_sent
				.with_label_values(&[peer_set.get_protocol_name_static()])
				.inc_by(to_peers as u64);

			metrics.bytes_sent
				.with_label_values(&[peer_set.get_protocol_name_static()])
				.inc_by((size * to_peers) as u64);
		}
	}

	fn note_desired_peer_count(&self, peer_set: PeerSet, size: usize) {
		self.0.as_ref().map(|metrics| metrics
			.desired_peer_count
			.with_label_values(&[peer_set.get_protocol_name_static()])
			.set(size as u64)
		);
	}
}

#[derive(Clone)]
struct MetricsInner {
	peer_count: prometheus::GaugeVec<prometheus::U64>,
	connected_events: prometheus::CounterVec<prometheus::U64>,
	disconnected_events: prometheus::CounterVec<prometheus::U64>,
	desired_peer_count: prometheus::GaugeVec<prometheus::U64>,

	notifications_received: prometheus::CounterVec<prometheus::U64>,
	notifications_sent: prometheus::CounterVec<prometheus::U64>,

	bytes_received: prometheus::CounterVec<prometheus::U64>,
	bytes_sent: prometheus::CounterVec<prometheus::U64>,
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry)
		-> std::result::Result<Self, prometheus::PrometheusError>
	{
		let metrics = MetricsInner {
			peer_count: prometheus::register(
				prometheus::GaugeVec::new(
					prometheus::Opts::new(
						"parachain_peer_count",
						"The number of peers on a parachain-related peer-set",
					),
					&["protocol"]
				)?,
				registry,
			)?,
			connected_events: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_peer_connect_events_total",
						"The number of peer connect events on a parachain notifications protocol",
					),
					&["protocol"]
				)?,
				registry,
			)?,
			disconnected_events: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_peer_disconnect_events_total",
						"The number of peer disconnect events on a parachain notifications protocol",
					),
					&["protocol"]
				)?,
				registry,
			)?,
			desired_peer_count: prometheus::register(
				prometheus::GaugeVec::new(
					prometheus::Opts::new(
						"parachain_desired_peer_count",
						"The number of peers that the local node is expected to connect to on a parachain-related peer-set",
					),
					&["protocol"]
				)?,
				registry,
			)?,
			notifications_received: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_notifications_received_total",
						"The number of notifications received on a parachain protocol",
					),
					&["protocol"]
				)?,
				registry,
			)?,
			notifications_sent: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_notifications_sent_total",
						"The number of notifications sent on a parachain protocol",
					),
					&["protocol"]
				)?,
				registry,
			)?,
			bytes_received: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_notification_bytes_received_total",
						"The number of bytes received on a parachain notification protocol",
					),
					&["protocol"]
				)?,
				registry,
			)?,
			bytes_sent: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_notification_bytes_sent_total",
						"The number of bytes sent on a parachain notification protocol",
					),
					&["protocol"]
				)?,
				registry,
			)?,
		};

		Ok(Metrics(Some(metrics)))
	}
}

/// Messages from and to the network.
///
/// As transmitted to and received from subsystems.
#[derive(Debug, Encode, Decode, Clone)]
pub enum WireMessage<M> {
	/// A message from a peer on a specific protocol.
	ProtocolMessage(M),
	/// A view update from a peer.
	ViewUpdate(View),
}

/// The network bridge subsystem.
pub struct NetworkBridge<N, AD> {
	network_service: N,
	authority_discovery_service: AD,
	request_multiplexer: RequestMultiplexer,
	sync_oracle: Box<dyn SyncOracle + Send>,
	metrics: Metrics,
impl<N, AD> NetworkBridge<N, AD> {
	/// Create a new network bridge subsystem with underlying network service and authority discovery service.
	///
	/// This assumes that the network service has had the notifications protocol for the network
	/// bridge already registered. See [`peers_sets_info`](peers_sets_info).
	pub fn new(
		network_service: N,
		authority_discovery_service: AD,
		request_multiplexer: RequestMultiplexer,
		sync_oracle: Box<dyn SyncOracle + Send>,
		metrics: Metrics,
		NetworkBridge {
			network_service,
			authority_discovery_service,
impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
		AD: validator_discovery::AuthorityDiscovery,
		Context: SubsystemContext<Message=NetworkBridgeMessage>,
{
	fn start(mut self, ctx: Context) -> SpawnedSubsystem {
		// The stream of networking events has to be created at initialization, otherwise the
		// networking might open connections before the stream of events has been grabbed.
		let network_stream = self.network_service.event_stream();

		// Swallow error because failure is fatal to the node and we log with more precision
		// within `run_network`.
		let future = run_network(self, ctx, network_stream)
			.map_err(|e| {
				SubsystemError::with_origin("network-bridge", e)
			})
			.boxed();
		SpawnedSubsystem {
			name: "network-bridge-subsystem",
			future,
	/// The Latest view sent by the peer.
#[derive(Debug)]
enum UnexpectedAbort {
	/// Received error from overseer:
	SubsystemError(polkadot_subsystem::SubsystemError),
	/// The stream of incoming events concluded.
	EventStreamConcluded,
	/// The stream of incoming requests concluded.
	RequestStreamConcluded,
}

impl From<SubsystemError> for UnexpectedAbort {
	fn from(e: SubsystemError) -> Self {
		UnexpectedAbort::SubsystemError(e)
	}
}

#[derive(Default, Clone)]
struct Shared(Arc<Mutex<SharedInner>>);

#[derive(Default)]
struct SharedInner {
	local_view: Option<View>,
	validation_peers: HashMap<PeerId, PeerData>,
	collation_peers: HashMap<PeerId, PeerData>,
}

enum Mode {
	Syncing(Box<dyn SyncOracle + Send>),
	Active,
}

async fn handle_subsystem_messages<Context, N, AD>(
	mut ctx: Context,
	mut network_service: N,
	mut authority_discovery_service: AD,
	shared: Shared,
	sync_oracle: Box<dyn SyncOracle + Send>,
	metrics: Metrics,
) -> Result<(), UnexpectedAbort>
	Context: SubsystemContext<Message = NetworkBridgeMessage>,
	AD: validator_discovery::AuthorityDiscovery,
{
	// This is kept sorted, descending, by block number.
	let mut live_heads: Vec<ActivatedLeaf> = Vec::with_capacity(MAX_VIEW_HEADS);
	let mut finalized_number = 0;
	let mut validator_discovery = validator_discovery::Service::<N, AD>::new();

	let mut mode = Mode::Syncing(sync_oracle);

	loop {
		futures::select! {
			msg = ctx.recv().fuse() => match msg {
				Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => {
					let ActiveLeavesUpdate { activated, deactivated } = active_leaves;
					tracing::trace!(
						action = "ActiveLeaves",
						num_activated = %activated.len(),
						num_deactivated = %deactivated.len(),

					for activated in activated {
						let pos = live_heads
							.binary_search_by(|probe| probe.number.cmp(&activated.number).reverse())
							.unwrap_or_else(|i| i);

						live_heads.insert(pos, activated);
					}
					live_heads.retain(|h| !deactivated.contains(&h.hash));

					// if we're done syncing, set the mode to `Mode::Active`.
					// Otherwise, we don't need to send view updates.
					{
						let is_done_syncing = match mode {
							Mode::Active => true,
							Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(),
						};

						if is_done_syncing {
							mode = Mode::Active;

							update_our_view(
								&mut network_service,
								&mut ctx,
								&live_heads,
								&shared,
								finalized_number,
								&metrics,
				Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) => {
					tracing::trace!(
					debug_assert!(finalized_number < number);
					// we don't send the view updates here, but delay them until the next `ActiveLeaves`
					// otherwise it might break assumptions of some of the subsystems
					// that we never send the same `ActiveLeavesUpdate`
					finalized_number = number;
				}
				Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => {
					return Ok(());
				}
				Ok(FromOverseer::Communication { msg }) => match msg {
					NetworkBridgeMessage::ReportPeer(peer, rep) => {
						if !rep.is_benefit() {
							tracing::debug!(
								target: LOG_TARGET,
								action = "ReportPeer"
							);
						}
						network_service.report_peer(peer, rep).await?
					}
					NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => {
						tracing::trace!(
							target: LOG_TARGET,
							action = "DisconnectPeer",
							?peer,
							peer_set = ?peer_set,
						);
						network_service.disconnect_peer(peer, peer_set).await?;
					}
					NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
						tracing::trace!(
							target: LOG_TARGET,
							action = "SendValidationMessages",
							num_messages = 1,
						);

						send_message(
							&mut network_service,
							peers,
							PeerSet::Validation,
							WireMessage::ProtocolMessage(msg),
							&metrics,
						).await?
					}
					NetworkBridgeMessage::SendValidationMessages(msgs) => {
						tracing::trace!(
							target: LOG_TARGET,
							action = "SendValidationMessages",
							num_messages = %msgs.len(),
						);

						for (peers, msg) in msgs {
							send_message(
								&mut network_service,
								peers,
								PeerSet::Validation,
								WireMessage::ProtocolMessage(msg),
								&metrics,
							).await?
						}
					}
					NetworkBridgeMessage::SendCollationMessage(peers, msg) => {
						tracing::trace!(
							target: LOG_TARGET,
							action = "SendCollationMessages",
							num_messages = 1,
						);

						send_message(
							&mut network_service,
							peers,
							PeerSet::Collation,
							WireMessage::ProtocolMessage(msg),
							&metrics,
						).await?
					}
					NetworkBridgeMessage::SendCollationMessages(msgs) => {
						tracing::trace!(
							target: LOG_TARGET,
							action = "SendCollationMessages",
							num_messages = %msgs.len(),
						);

						for (peers, msg) in msgs {
							send_message(
								&mut network_service,
								peers,
								PeerSet::Collation,
								WireMessage::ProtocolMessage(msg),
								&metrics,
							).await?
						}
					}
					NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => {
						tracing::trace!(
							target: LOG_TARGET,
							action = "SendRequests",
							num_requests = %reqs.len(),
						);

						for req in reqs {
							network_service
								.start_request(&mut authority_discovery_service, req, if_disconnected)
								.await;
						}
					}
					NetworkBridgeMessage::ConnectToValidators {
						validator_ids,
						peer_set,
					} => {
						tracing::trace!(
							target: LOG_TARGET,
							action = "ConnectToValidators",
							peer_set = ?peer_set,
							ids = ?validator_ids,
							"Received a validator connection request",
						);

						metrics.note_desired_peer_count(peer_set, validator_ids.len());

						let (ns, ads) = validator_discovery.on_request(
							validator_ids,
							peer_set,
							network_service,
							authority_discovery_service,
						).await;

						network_service = ns;
						authority_discovery_service = ads;
					}
async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
	mut sender: impl SubsystemSender,
	mut network_service: impl Network,
	mut network_stream: BoxStream<'static, NetworkEvent>,
	mut authority_discovery_service: AD,
	mut request_multiplexer: RequestMultiplexer,
	metrics: Metrics,
	shared: Shared,
) -> Result<(), UnexpectedAbort> {
	loop {
		futures::select! {
			network_event = network_stream.next().fuse() => match network_event {
				None => return Err(UnexpectedAbort::EventStreamConcluded),
				Some(NetworkEvent::Dht(_))
				| Some(NetworkEvent::SyncConnected { .. })
				| Some(NetworkEvent::SyncDisconnected { .. }) => {}
				Some(NetworkEvent::NotificationStreamOpened { remote: peer, protocol, role, .. }) => {
					let role = ObservedRole::from(role);
					let peer_set = match PeerSet::try_from_protocol_name(&protocol) {
						None => continue,
						Some(peer_set) => peer_set,
					};

						target: LOG_TARGET,
						action = "PeerConnected",
						peer_set = ?peer_set,
						peer = ?peer,
						role = ?role
					);
					let local_view = {
						let mut shared = shared.0.lock();
						let peer_map = match peer_set {
							PeerSet::Validation => &mut shared.validation_peers,
							PeerSet::Collation => &mut shared.collation_peers,
						};

						match peer_map.entry(peer.clone()) {
							hash_map::Entry::Occupied(_) => continue,
							hash_map::Entry::Vacant(vacant) => {
								vacant.insert(PeerData { view: View::default() });
							}
						}

						metrics.on_peer_connected(peer_set);
						metrics.note_peer_count(peer_set, peer_map.len());

						shared.local_view.clone().unwrap_or(View::default())
					let maybe_authority =
						authority_discovery_service
							.get_authority_id_by_peer_id(peer).await;

					match peer_set {
						PeerSet::Validation => {
							dispatch_validation_events_to_all(
								vec![
									NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority),
									NetworkBridgeEvent::PeerViewChange(
										peer.clone(),
										View::default(),
									),
								],
								&mut sender,
							).await;

							send_message(
								&mut network_service,
								vec![peer],
								PeerSet::Validation,
								WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
									local_view,
								),
								&metrics,
							).await?;
						}
						PeerSet::Collation => {
							dispatch_collation_events_to_all(
								vec![
									NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority),
									NetworkBridgeEvent::PeerViewChange(
										peer.clone(),
										View::default(),
									),
								],
								&mut sender,
							).await;

							send_message(
								&mut network_service,
								vec![peer],
								PeerSet::Collation,
								WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(
									local_view,
								),
								&metrics,
				Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => {
					let peer_set = match PeerSet::try_from_protocol_name(&protocol) {
						None => continue,
						Some(peer_set) => peer_set,
					};
						target: LOG_TARGET,
						action = "PeerDisconnected",
						peer_set = ?peer_set,
						peer = ?peer
					);
					let was_connected = {
						let mut shared = shared.0.lock();
						let peer_map = match peer_set {
							PeerSet::Validation => &mut shared.validation_peers,
							PeerSet::Collation => &mut shared.collation_peers,
						};
						let w = peer_map.remove(&peer).is_some();

						metrics.on_peer_disconnected(peer_set);
						metrics.note_peer_count(peer_set, peer_map.len());

						w
							PeerSet::Validation => dispatch_validation_event_to_all(
								NetworkBridgeEvent::PeerDisconnected(peer),
								&mut sender,
							).await,
							PeerSet::Collation => dispatch_collation_event_to_all(
								NetworkBridgeEvent::PeerDisconnected(peer),
								&mut sender,
							).await,
				Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
					let v_messages: Result<Vec<_>, _> = messages
						.iter()
						.filter(|(protocol, _)| {
							protocol == &PeerSet::Validation.into_protocol_name()
						})
						.map(|(_, msg_bytes)| {
							WireMessage::decode(&mut msg_bytes.as_ref())
								.map(|m| (m, msg_bytes.len()))
						})
						.collect();

					let v_messages = match v_messages {
						Err(_) => {
							tracing::debug!(
								target: LOG_TARGET,
								action = "ReportPeer"
							);

							network_service.report_peer(remote, MALFORMED_MESSAGE_COST).await?;
							continue;
						}
						Ok(v) => v,
					};

					let c_messages: Result<Vec<_>, _> = messages
						.iter()
						.filter(|(protocol, _)| {
							protocol == &PeerSet::Collation.into_protocol_name()
						})
						.map(|(_, msg_bytes)| {
							WireMessage::decode(&mut msg_bytes.as_ref())
								.map(|m| (m, msg_bytes.len()))
						})
						.collect();

					match c_messages {
						Err(_) => {
							tracing::debug!(
								target: LOG_TARGET,
								action = "ReportPeer"
							);

							network_service.report_peer(remote, MALFORMED_MESSAGE_COST).await?;
							continue;
						}
						Ok(c_messages) => {
							if v_messages.is_empty() && c_messages.is_empty() {
								continue;
							} else {
								tracing::trace!(
									target: LOG_TARGET,
									action = "PeerMessages",
									peer = ?remote,
									num_validation_messages = %v_messages.len(),
									num_collation_messages = %c_messages.len()
								);

								if !v_messages.is_empty() {
									let (events, reports) = handle_peer_messages(
										remote.clone(),
										PeerSet::Validation,
										&mut shared.0.lock().validation_peers,
										v_messages,
										&metrics,
									);

									for report in reports {
										network_service.report_peer(remote.clone(), report).await?;
									}

									dispatch_validation_events_to_all(events, &mut sender).await;
								}

								if !c_messages.is_empty() {
									let (events, reports) = handle_peer_messages(
										remote.clone(),
										PeerSet::Collation,
										&mut shared.0.lock().collation_peers,
										c_messages,
										&metrics,
									);

									for report in reports {
										network_service.report_peer(remote.clone(), report).await?;
									}


									dispatch_collation_events_to_all(events, &mut sender).await;
								}
							}
						}
			req_res_event = request_multiplexer.next().fuse() => match req_res_event {
				None => return Err(UnexpectedAbort::RequestStreamConcluded),
				Some(Err(err)) => {
					sender.send_message(NetworkBridgeMessage::ReportPeer(
						err.peer,
						MALFORMED_MESSAGE_COST,
					).into()).await;
				Some(Ok(msg)) => {
					sender.send_message(msg).await;
	}
}

/// Main driver, processing network events and messages from other subsystems.
///
/// THIS IS A HACK. We need to ensure we never hold the mutex across a `.await` boundary
/// and `parking_lot` currently does not provide `Send`, which helps us enforce that.
/// If this breaks, we need to find another way to protect ourselves.
///
/// ```compile_fail
/// #use parking_lot::MutexGuard;
/// #fn is_send<T: Send>();
/// #is_send::<parking_lot::MutexGuard<'static, ()>();
/// ```
#[tracing::instrument(skip(bridge, ctx, network_stream), fields(subsystem = LOG_TARGET))]
async fn run_network<N, AD>(
	bridge: NetworkBridge<N, AD>,
	mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
	network_stream: BoxStream<'static, NetworkEvent>,
	AD: validator_discovery::AuthorityDiscovery,
{
	let shared = Shared::default();

	let NetworkBridge {
		network_service,
		authority_discovery_service,
	let statement_receiver = request_multiplexer
		.get_statement_fetching()
		.expect("Gets initialized, must be `Some` on startup. qed.");

	let (remote, network_event_handler) = handle_network_messages(
		ctx.sender().clone(),
		network_service.clone(),
		authority_discovery_service.clone(),
		metrics.clone(),
		shared.clone(),
	).remote_handle();

	ctx.spawn("network-bridge-network-worker", Box::pin(remote)).await?;

	ctx.send_message(AllMessages::StatementDistribution(
		StatementDistributionMessage::StatementFetchingReceiver(statement_receiver)
	)).await;

	let subsystem_event_handler = handle_subsystem_messages(
		ctx,
		network_service,
		authority_discovery_service,
		shared,
	futures::pin_mut!(subsystem_event_handler);

	match futures::future::select(subsystem_event_handler, network_event_handler)
		.await
		.factor_first()
		.0
	{
		Ok(()) => Ok(()),
		Err(UnexpectedAbort::SubsystemError(err)) => {
			tracing::warn!(
				target: LOG_TARGET,
				err = ?err,
				"Shutting down Network Bridge due to error"
			);

			Err(SubsystemError::Context(format!(
				"Received SubsystemError from overseer: {:?}",
				err
			)))
		}
		Err(UnexpectedAbort::EventStreamConcluded) => {
			tracing::info!(
				target: LOG_TARGET,
				"Shutting down Network Bridge: underlying request stream concluded"
			);
			Err(SubsystemError::Context(
				"Incoming network event stream concluded.".to_string(),
			))
		}
		Err(UnexpectedAbort::RequestStreamConcluded) => {
			tracing::info!(
				target: LOG_TARGET,
				"Shutting down Network Bridge: underlying request stream concluded"
			);
			Err(SubsystemError::Context(
				"Incoming network request stream concluded".to_string(),
			))
		}
fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_number: BlockNumber) -> View {
		live_heads.take(MAX_VIEW_HEADS),
		finalized_number,
#[tracing::instrument(level = "trace", skip(net, ctx, shared, metrics), fields(subsystem = LOG_TARGET))]
async fn update_our_view(
	net: &mut impl Network,
	ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
	finalized_number: BlockNumber,
	metrics: &Metrics,
) -> SubsystemResult<()> {
	let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number);
	let (validation_peers, collation_peers) = {
		let mut shared = shared.0.lock();
		// We only want to send a view update when the heads changed.
		// A change in finalized block number only is _not_ sufficient.
		//
		// If this is the first view update since becoming active, but our view is empty,
		// there is no need to send anything.
		match shared.local_view {
			Some(ref v) if v.check_heads_eq(&new_view) => {
				return Ok(())
			}
			None if live_heads.is_empty() => {
				shared.local_view = Some(new_view);
				return Ok(())
			}
			_ => {
				shared.local_view = Some(new_view.clone());
			}

		(
			shared.validation_peers.keys().cloned().collect::<Vec<_>>(),
			shared.collation_peers.keys().cloned().collect::<Vec<_>>(),
		)
	};
	send_validation_message(
		net,
		WireMessage::ViewUpdate(new_view.clone()),
	).await?;

	send_collation_message(
		net,
		WireMessage::ViewUpdate(new_view),
	let our_view = OurView::new(
		live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)),
		finalized_number,
	);
	dispatch_validation_event_to_all_unbounded(
		NetworkBridgeEvent::OurViewChange(our_view.clone()),
		ctx.sender(),
	);
	dispatch_collation_event_to_all_unbounded(
		NetworkBridgeEvent::OurViewChange(our_view),
		ctx.sender(),
	);

	Ok(())
}

// Handle messages on a specific peer-set. The peer is expected to be connected on that
// peer-set.
#[tracing::instrument(level = "trace", skip(peers, messages, metrics), fields(subsystem = LOG_TARGET))]