From 8d033b6dfbfab9ecd9b8d8d6a98596a457fabb8a Mon Sep 17 00:00:00 2001
From: Dmitry Markin <dmitry@markin.tech>
Date: Mon, 20 Feb 2023 15:08:02 +0300
Subject: [PATCH] Use async/await instead of manual polling of `NetworkWorker`
 (#13219)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Convert `NetworkWorker::poll()` into async `next_action()`

* Use `NetworkWorker::next_action` instead of `poll` in `sc-network-test`

* Revert "Use `NetworkWorker::next_action` instead of `poll` in `sc-network-test`"

This reverts commit 4b5d851ec864f78f9d083a18a618fbe117c896d2.

* Fix `sc-network-test` to poll `NetworkWorker::next_action`

* Fix `sc_network::service` tests to poll `NetworkWorker::next_action`

* Fix docs

* kick CI

* Factor out `next_worker_message()` & `next_swarm_event()`

* Error handling: replace `futures::pending!()` with `expect()`

* Simplify stream polling in `select!`

* Replace `NetworkWorker::next_action()` with `run()`

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* minor: comment

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Print debug log when network future is shut down

* Evaluate `NetworkWorker::run()` future once before the loop

* Fix client code to match new `NetworkService` interfaces

* Make clippy happy

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Revert "Apply suggestions from code review"

This reverts commit 9fa646d0ed613e5f8623d3d37d1d59ec0a535850.

* Make `NetworkWorker::run()` consume `self`

* Terminate system RPC future if RPC rx stream has terminated.

* Rewrite with let-else

* Fix comments

* Get `best_seen_block` and call `on_block_finalized` via `ChainSync` instead of `NetworkService`

* rustfmt

* make clippy happy

* Tests: schedule wake if `next_action()` returned true

* minor: comment

* minor: fix `NetworkWorker` rustdoc

* minor: amend the rustdoc

* Fix bug that caused `on_demand_beefy_justification_sync` test to hang

* rustfmt

* Apply review suggestions

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
---
 .../authority-discovery/src/worker/tests.rs   |    4 +
 .../client/network/common/src/service.rs      |   11 +-
 substrate/client/network/src/service.rs       | 1190 +++++++++--------
 .../network/src/service/tests/chain_sync.rs   |   36 +-
 .../client/network/src/service/tests/mod.rs   |    5 +-
 substrate/client/network/sync/src/lib.rs      |    6 +
 .../network/sync/src/service/chain_sync.rs    |   22 +-
 substrate/client/network/test/src/lib.rs      |   17 +-
 substrate/client/offchain/src/api.rs          |    4 +
 substrate/client/offchain/src/lib.rs          |    4 +
 substrate/client/service/src/builder.rs       |   30 +-
 substrate/client/service/src/lib.rs           |  263 ++--
 12 files changed, 853 insertions(+), 739 deletions(-)

diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs
index ce55728a1bf..7f3d113a834 100644
--- a/substrate/client/authority-discovery/src/worker/tests.rs
+++ b/substrate/client/authority-discovery/src/worker/tests.rs
@@ -184,6 +184,10 @@ impl NetworkStateInfo for TestNetwork {
 	fn external_addresses(&self) -> Vec<Multiaddr> {
 		self.external_addresses.clone()
 	}
+
+	fn listen_addresses(&self) -> Vec<Multiaddr> {
+		self.external_addresses.clone()
+	}
 }
 
 struct TestSigner<'a> {
diff --git a/substrate/client/network/common/src/service.rs b/substrate/client/network/common/src/service.rs
index 54d254eac38..f0f30780020 100644
--- a/substrate/client/network/common/src/service.rs
+++ b/substrate/client/network/common/src/service.rs
@@ -180,13 +180,13 @@ pub trait NetworkPeers {
 	/// purposes.
 	fn deny_unreserved_peers(&self);
 
-	/// Adds a `PeerId` and its `Multiaddr` as reserved.
+	/// Adds a `PeerId` and its `Multiaddr` as reserved for a sync protocol (default peer set).
 	///
 	/// Returns an `Err` if the given string is not a valid multiaddress
 	/// or contains an invalid peer ID (which includes the local peer ID).
 	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String>;
 
-	/// Removes a `PeerId` from the list of reserved peers.
+	/// Removes a `PeerId` from the list of reserved peers for a sync protocol (default peer set).
 	fn remove_reserved_peer(&self, peer_id: PeerId);
 
 	/// Sets the reserved set of a protocol to the given set of peers.
@@ -359,6 +359,9 @@ pub trait NetworkStateInfo {
 	/// Returns the local external addresses.
 	fn external_addresses(&self) -> Vec<Multiaddr>;
 
+	/// Returns the listening addresses (without trailing `/p2p/` with our `PeerId`).
+	fn listen_addresses(&self) -> Vec<Multiaddr>;
+
 	/// Returns the local Peer ID.
 	fn local_peer_id(&self) -> PeerId;
 }
@@ -372,6 +375,10 @@ where
 		T::external_addresses(self)
 	}
 
+	fn listen_addresses(&self) -> Vec<Multiaddr> {
+		T::listen_addresses(self)
+	}
+
 	fn local_peer_id(&self) -> PeerId {
 		T::local_peer_id(self)
 	}
diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs
index dbe148c8993..3c8856eaf8e 100644
--- a/substrate/client/network/src/service.rs
+++ b/substrate/client/network/src/service.rs
@@ -19,13 +19,13 @@
 //! Main entry point of the sc-network crate.
 //!
 //! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
-//! The [`NetworkWorker`] *is* the network and implements the `Future` trait. It must be polled in
-//! order for the network to advance.
+//! The [`NetworkWorker`] *is* the network. Network is driven by [`NetworkWorker::run`] future that
+//! terminates only when all instances of the control handles [`NetworkService`] were dropped.
 //! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
 //! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
 //!
 //! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
-//! which is then processed by [`NetworkWorker::poll`].
+//! which is then processed by [`NetworkWorker::next_action`].
 
 use crate::{
 	behaviour::{self, Behaviour, BehaviourOut},
@@ -46,8 +46,9 @@ use libp2p::{
 	multiaddr,
 	ping::Failure as PingFailure,
 	swarm::{
-		AddressScore, ConnectionError, ConnectionLimits, DialError, Executor, NetworkBehaviour,
-		PendingConnectionError, Swarm, SwarmBuilder, SwarmEvent,
+		AddressScore, ConnectionError, ConnectionHandler, ConnectionLimits, DialError, Executor,
+		IntoConnectionHandler, NetworkBehaviour, PendingConnectionError, Swarm, SwarmBuilder,
+		SwarmEvent,
 	},
 	Multiaddr, PeerId,
 };
@@ -87,7 +88,6 @@ use std::{
 		atomic::{AtomicBool, AtomicUsize, Ordering},
 		Arc,
 	},
-	task::Poll,
 };
 
 pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
@@ -100,12 +100,20 @@ mod tests;
 pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey};
 use sc_network_common::service::{NetworkBlock, NetworkRequest};
 
+/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
+/// Used as a template parameter of [`SwarmEvent`] below.
+type ConnectionHandlerErr<TBehaviour> =
+	<<<TBehaviour as NetworkBehaviour>::ConnectionHandler as IntoConnectionHandler>
+		::Handler as ConnectionHandler>::Error;
+
 /// Substrate network service. Handles network IO and manages connectivity.
 pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
 	/// Number of peers we're connected to.
 	num_connected: Arc<AtomicUsize>,
 	/// The local external addresses.
 	external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
+	/// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`.
+	listen_addresses: Arc<Mutex<Vec<Multiaddr>>>,
 	/// Are we actively catching up with the chain?
 	is_major_syncing: Arc<AtomicBool>,
 	/// Local copy of the `PeerId` of the local node.
@@ -434,11 +442,13 @@ where
 		}
 
 		let external_addresses = Arc::new(Mutex::new(Vec::new()));
+		let listen_addresses = Arc::new(Mutex::new(Vec::new()));
 		let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new()));
 
 		let service = Arc::new(NetworkService {
 			bandwidth,
 			external_addresses: external_addresses.clone(),
+			listen_addresses: listen_addresses.clone(),
 			num_connected: num_connected.clone(),
 			is_major_syncing: is_major_syncing.clone(),
 			peerset: peerset_handle,
@@ -455,6 +465,7 @@ where
 
 		Ok(NetworkWorker {
 			external_addresses,
+			listen_addresses,
 			num_connected,
 			is_major_syncing,
 			network_service: swarm,
@@ -711,6 +722,34 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
 		}
 	}
 
+	/// Get connected peers debug information.
+	///
+	/// Returns an error if the `NetworkWorker` is no longer running.
+	pub async fn peers_debug_info(&self) -> Result<Vec<(PeerId, PeerInfo<B>)>, ()> {
+		let (tx, rx) = oneshot::channel();
+
+		let _ = self
+			.to_worker
+			.unbounded_send(ServiceToWorkerMsg::PeersDebugInfo { pending_response: tx });
+
+		// The channel can only be closed if the network worker no longer exists.
+		rx.await.map_err(|_| ())
+	}
+
+	/// Get the list of reserved peers.
+	///
+	/// Returns an error if the `NetworkWorker` is no longer running.
+	pub async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
+		let (tx, rx) = oneshot::channel();
+
+		let _ = self
+			.to_worker
+			.unbounded_send(ServiceToWorkerMsg::ReservedPeers { pending_response: tx });
+
+		// The channel can only be closed if the network worker no longer exists.
+		rx.await.map_err(|_| ())
+	}
+
 	/// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates.
 	///
 	/// Returns an `Err` if one of the given addresses is invalid or contains an
@@ -774,6 +813,11 @@ where
 		self.external_addresses.lock().clone()
 	}
 
+	/// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`).
+	fn listen_addresses(&self) -> Vec<Multiaddr> {
+		self.listen_addresses.lock().clone()
+	}
+
 	/// Returns the local Peer ID.
 	fn local_peer_id(&self) -> PeerId {
 		self.local_peer_id
@@ -1243,6 +1287,12 @@ enum ServiceToWorkerMsg<B: BlockT> {
 	},
 	DisconnectPeer(PeerId, ProtocolName),
 	NewBestBlockImported(B::Hash, NumberFor<B>),
+	PeersDebugInfo {
+		pending_response: oneshot::Sender<Vec<(PeerId, PeerInfo<B>)>>,
+	},
+	ReservedPeers {
+		pending_response: oneshot::Sender<Vec<PeerId>>,
+	},
 }
 
 /// Main network worker. Must be polled in order for the network to advance.
@@ -1258,6 +1308,8 @@ where
 	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
 	external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
 	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
+	listen_addresses: Arc<Mutex<Vec<Multiaddr>>>,
+	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
 	num_connected: Arc<AtomicUsize>,
 	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
 	is_major_syncing: Arc<AtomicBool>,
@@ -1281,637 +1333,595 @@ where
 	_marker: PhantomData<H>,
 }
 
-impl<B, H, Client> Future for NetworkWorker<B, H, Client>
+impl<B, H, Client> NetworkWorker<B, H, Client>
 where
 	B: BlockT + 'static,
 	H: ExHashT,
 	Client: HeaderBackend<B> + 'static,
 {
-	type Output = ();
-
-	fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
-		let this = &mut *self;
-
-		// At the time of writing of this comment, due to a high volume of messages, the network
-		// worker sometimes takes a long time to process the loop below. When that happens, the
-		// rest of the polling is frozen. In order to avoid negative side-effects caused by this
-		// freeze, a limit to the number of iterations is enforced below. If the limit is reached,
-		// the task is interrupted then scheduled again.
-		//
-		// This allows for a more even distribution in the time taken by each sub-part of the
-		// polling.
-		let mut num_iterations = 0;
-		loop {
-			num_iterations += 1;
-			if num_iterations >= 100 {
-				cx.waker().wake_by_ref();
-				break
-			}
+	/// Run the network.
+	pub async fn run(mut self) {
+		while self.next_action().await {}
+	}
 
-			// Process the next message coming from the `NetworkService`.
-			let msg = match this.from_service.poll_next_unpin(cx) {
-				Poll::Ready(Some(msg)) => msg,
-				Poll::Ready(None) => return Poll::Ready(()),
-				Poll::Pending => break,
-			};
-			match msg {
-				ServiceToWorkerMsg::AnnounceBlock(hash, data) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.announce_block(hash, data),
-				ServiceToWorkerMsg::GetValue(key) =>
-					this.network_service.behaviour_mut().get_value(key),
-				ServiceToWorkerMsg::PutValue(key, value) =>
-					this.network_service.behaviour_mut().put_value(key, value),
-				ServiceToWorkerMsg::SetReservedOnly(reserved_only) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.set_reserved_only(reserved_only),
-				ServiceToWorkerMsg::SetReserved(peers) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.set_reserved_peers(peers),
-				ServiceToWorkerMsg::SetPeersetReserved(protocol, peers) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.set_reserved_peerset_peers(protocol, peers),
-				ServiceToWorkerMsg::AddReserved(peer_id) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.add_reserved_peer(peer_id),
-				ServiceToWorkerMsg::RemoveReserved(peer_id) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.remove_reserved_peer(peer_id),
-				ServiceToWorkerMsg::AddSetReserved(protocol, peer_id) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.add_set_reserved_peer(protocol, peer_id),
-				ServiceToWorkerMsg::RemoveSetReserved(protocol, peer_id) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.remove_set_reserved_peer(protocol, peer_id),
-				ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
-					this.network_service.behaviour_mut().add_known_address(peer_id, addr),
-				ServiceToWorkerMsg::AddToPeersSet(protocol, peer_id) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.add_to_peers_set(protocol, peer_id),
-				ServiceToWorkerMsg::RemoveFromPeersSet(protocol, peer_id) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.remove_from_peers_set(protocol, peer_id),
-				ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender),
-				ServiceToWorkerMsg::Request {
-					target,
-					protocol,
-					request,
-					pending_response,
-					connect,
-				} => {
-					this.network_service.behaviour_mut().send_request(
-						&target,
-						&protocol,
-						request,
-						pending_response,
-						connect,
-					);
-				},
-				ServiceToWorkerMsg::NetworkStatus { pending_response } => {
-					let _ = pending_response.send(Ok(this.status()));
-				},
-				ServiceToWorkerMsg::NetworkState { pending_response } => {
-					let _ = pending_response.send(Ok(this.network_state()));
-				},
-				ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.disconnect_peer(&who, protocol_name),
-				ServiceToWorkerMsg::NewBestBlockImported(hash, number) => this
-					.network_service
-					.behaviour_mut()
-					.user_protocol_mut()
-					.new_best_block_imported(hash, number),
-			}
+	/// Perform one action on the network.
+	///
+	/// Returns `false` when the worker should be shutdown.
+	/// Use in tests only.
+	pub async fn next_action(&mut self) -> bool {
+		futures::select! {
+			// Next message from the service.
+			msg = self.from_service.next() => {
+				if let Some(msg) = msg {
+					self.handle_worker_message(msg);
+				} else {
+					return false
+				}
+			},
+			// Next event from `Swarm` (the stream guaranteed to never terminate).
+			event = self.network_service.select_next_some() => {
+				self.handle_swarm_event(event);
+			},
+		};
+
+		let num_connected_peers =
+			self.network_service.behaviour_mut().user_protocol_mut().num_connected_peers();
+
+		// Update the variables shared with the `NetworkService`.
+		self.num_connected.store(num_connected_peers, Ordering::Relaxed);
+		{
+			let external_addresses =
+				self.network_service.external_addresses().map(|r| &r.addr).cloned().collect();
+			*self.external_addresses.lock() = external_addresses;
+
+			let listen_addresses =
+				self.network_service.listeners().map(ToOwned::to_owned).collect();
+			*self.listen_addresses.lock() = listen_addresses;
 		}
 
-		// `num_iterations` serves the same purpose as in the previous loop.
-		// See the previous loop for explanations.
-		let mut num_iterations = 0;
-		loop {
-			num_iterations += 1;
-			if num_iterations >= 1000 {
-				cx.waker().wake_by_ref();
-				break
+		let is_major_syncing = self
+			.network_service
+			.behaviour_mut()
+			.user_protocol_mut()
+			.sync_state()
+			.state
+			.is_major_syncing();
+
+		self.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
+
+		if let Some(metrics) = self.metrics.as_ref() {
+			if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
+				for (lower_ilog2_bucket_bound, num_entries) in buckets {
+					metrics
+						.kbuckets_num_nodes
+						.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
+						.set(num_entries as u64);
+				}
+			}
+			if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
+				metrics.kademlia_records_count.set(num_entries as u64);
+			}
+			if let Some(num_entries) =
+				self.network_service.behaviour_mut().kademlia_records_total_size()
+			{
+				metrics.kademlia_records_sizes_total.set(num_entries as u64);
 			}
+			metrics
+				.peerset_num_discovered
+				.set(self.network_service.behaviour_mut().user_protocol().num_discovered_peers()
+					as u64);
+			metrics.pending_connections.set(
+				Swarm::network_info(&self.network_service).connection_counters().num_pending()
+					as u64,
+			);
+		}
 
-			// Process the next action coming from the network.
-			let next_event = this.network_service.select_next_some();
-			futures::pin_mut!(next_event);
-			let poll_value = next_event.poll_unpin(cx);
+		true
+	}
 
-			match poll_value {
-				Poll::Pending => break,
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest {
-					protocol,
-					result,
-					..
-				})) => {
-					if let Some(metrics) = this.metrics.as_ref() {
-						match result {
-							Ok(serve_time) => {
-								metrics
-									.requests_in_success_total
-									.with_label_values(&[&protocol])
-									.observe(serve_time.as_secs_f64());
-							},
-							Err(err) => {
-								let reason = match err {
-									ResponseFailure::Network(InboundFailure::Timeout) => "timeout",
-									ResponseFailure::Network(
-										InboundFailure::UnsupportedProtocols,
-									) =>
-									// `UnsupportedProtocols` is reported for every single
-									// inbound request whenever a request with an unsupported
-									// protocol is received. This is not reported in order to
-									// avoid confusions.
-										continue,
-									ResponseFailure::Network(InboundFailure::ResponseOmission) =>
-										"busy-omitted",
-									ResponseFailure::Network(InboundFailure::ConnectionClosed) =>
-										"connection-closed",
-								};
+	/// Process the next message coming from the `NetworkService`.
+	fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg<B>) {
+		match msg {
+			ServiceToWorkerMsg::AnnounceBlock(hash, data) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.announce_block(hash, data),
+			ServiceToWorkerMsg::GetValue(key) =>
+				self.network_service.behaviour_mut().get_value(key),
+			ServiceToWorkerMsg::PutValue(key, value) =>
+				self.network_service.behaviour_mut().put_value(key, value),
+			ServiceToWorkerMsg::SetReservedOnly(reserved_only) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.set_reserved_only(reserved_only),
+			ServiceToWorkerMsg::SetReserved(peers) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.set_reserved_peers(peers),
+			ServiceToWorkerMsg::SetPeersetReserved(protocol, peers) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.set_reserved_peerset_peers(protocol, peers),
+			ServiceToWorkerMsg::AddReserved(peer_id) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.add_reserved_peer(peer_id),
+			ServiceToWorkerMsg::RemoveReserved(peer_id) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.remove_reserved_peer(peer_id),
+			ServiceToWorkerMsg::AddSetReserved(protocol, peer_id) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.add_set_reserved_peer(protocol, peer_id),
+			ServiceToWorkerMsg::RemoveSetReserved(protocol, peer_id) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.remove_set_reserved_peer(protocol, peer_id),
+			ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
+				self.network_service.behaviour_mut().add_known_address(peer_id, addr),
+			ServiceToWorkerMsg::AddToPeersSet(protocol, peer_id) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.add_to_peers_set(protocol, peer_id),
+			ServiceToWorkerMsg::RemoveFromPeersSet(protocol, peer_id) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.remove_from_peers_set(protocol, peer_id),
+			ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
+			ServiceToWorkerMsg::Request {
+				target,
+				protocol,
+				request,
+				pending_response,
+				connect,
+			} => {
+				self.network_service.behaviour_mut().send_request(
+					&target,
+					&protocol,
+					request,
+					pending_response,
+					connect,
+				);
+			},
+			ServiceToWorkerMsg::NetworkStatus { pending_response } => {
+				let _ = pending_response.send(Ok(self.status()));
+			},
+			ServiceToWorkerMsg::NetworkState { pending_response } => {
+				let _ = pending_response.send(Ok(self.network_state()));
+			},
+			ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.disconnect_peer(&who, protocol_name),
+			ServiceToWorkerMsg::NewBestBlockImported(hash, number) => self
+				.network_service
+				.behaviour_mut()
+				.user_protocol_mut()
+				.new_best_block_imported(hash, number),
+			ServiceToWorkerMsg::PeersDebugInfo { pending_response } => {
+				let _ = pending_response.send(self.peers_debug_info());
+			},
+			ServiceToWorkerMsg::ReservedPeers { pending_response } => {
+				let _ =
+					pending_response.send(self.reserved_peers().map(ToOwned::to_owned).collect());
+			},
+		}
+	}
 
+	/// Process the next event coming from `Swarm`.
+	fn handle_swarm_event(
+		&mut self,
+		event: SwarmEvent<BehaviourOut, ConnectionHandlerErr<Behaviour<B, Client>>>,
+	) {
+		match event {
+			SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
+				if let Some(metrics) = self.metrics.as_ref() {
+					match result {
+						Ok(serve_time) => {
+							metrics
+								.requests_in_success_total
+								.with_label_values(&[&protocol])
+								.observe(serve_time.as_secs_f64());
+						},
+						Err(err) => {
+							let reason = match err {
+								ResponseFailure::Network(InboundFailure::Timeout) =>
+									Some("timeout"),
+								ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
+								// `UnsupportedProtocols` is reported for every single
+								// inbound request whenever a request with an unsupported
+								// protocol is received. This is not reported in order to
+								// avoid confusions.
+									None,
+								ResponseFailure::Network(InboundFailure::ResponseOmission) =>
+									Some("busy-omitted"),
+								ResponseFailure::Network(InboundFailure::ConnectionClosed) =>
+									Some("connection-closed"),
+							};
+
+							if let Some(reason) = reason {
 								metrics
 									.requests_in_failure_total
 									.with_label_values(&[&protocol, reason])
 									.inc();
-							},
-						}
+							}
+						},
 					}
-				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
-					protocol,
-					duration,
-					result,
-					..
-				})) =>
-					if let Some(metrics) = this.metrics.as_ref() {
-						match result {
-							Ok(_) => {
-								metrics
-									.requests_out_success_total
-									.with_label_values(&[&protocol])
-									.observe(duration.as_secs_f64());
-							},
-							Err(err) => {
-								let reason = match err {
-									RequestFailure::NotConnected => "not-connected",
-									RequestFailure::UnknownProtocol => "unknown-protocol",
-									RequestFailure::Refused => "refused",
-									RequestFailure::Obsolete => "obsolete",
-									RequestFailure::Network(OutboundFailure::DialFailure) =>
-										"dial-failure",
-									RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
-									RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
-										"connection-closed",
-									RequestFailure::Network(
-										OutboundFailure::UnsupportedProtocols,
-									) => "unsupported",
-								};
+				}
+			},
+			SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
+				protocol,
+				duration,
+				result,
+				..
+			}) =>
+				if let Some(metrics) = self.metrics.as_ref() {
+					match result {
+						Ok(_) => {
+							metrics
+								.requests_out_success_total
+								.with_label_values(&[&protocol])
+								.observe(duration.as_secs_f64());
+						},
+						Err(err) => {
+							let reason = match err {
+								RequestFailure::NotConnected => "not-connected",
+								RequestFailure::UnknownProtocol => "unknown-protocol",
+								RequestFailure::Refused => "refused",
+								RequestFailure::Obsolete => "obsolete",
+								RequestFailure::Network(OutboundFailure::DialFailure) =>
+									"dial-failure",
+								RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
+								RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
+									"connection-closed",
+								RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
+									"unsupported",
+							};
 
-								metrics
-									.requests_out_failure_total
-									.with_label_values(&[&protocol, reason])
-									.inc();
-							},
-						}
-					},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::ReputationChanges {
-					peer,
-					changes,
-				})) =>
-					for change in changes {
-						this.network_service.behaviour().user_protocol().report_peer(peer, change);
-					},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
-					peer_id,
-					info:
-						IdentifyInfo {
-							protocol_version,
-							agent_version,
-							mut listen_addrs,
-							protocols,
-							..
+							metrics
+								.requests_out_failure_total
+								.with_label_values(&[&protocol, reason])
+								.inc();
 						},
-				})) => {
-					if listen_addrs.len() > 30 {
-						debug!(
-							target: "sub-libp2p",
-							"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
-							peer_id, protocol_version, agent_version
-						);
-						listen_addrs.truncate(30);
 					}
-					for addr in listen_addrs {
-						this.network_service
-							.behaviour_mut()
-							.add_self_reported_address_to_dht(&peer_id, &protocols, addr);
-					}
-					this.network_service
-						.behaviour_mut()
-						.user_protocol_mut()
-						.add_default_set_discovered_nodes(iter::once(peer_id));
 				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id))) => {
-					this.network_service
-						.behaviour_mut()
-						.user_protocol_mut()
-						.add_default_set_discovered_nodes(iter::once(peer_id));
+			SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) =>
+				for change in changes {
+					self.network_service.behaviour().user_protocol().report_peer(peer, change);
 				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) =>
-					if let Some(metrics) = this.metrics.as_ref() {
-						metrics.kademlia_random_queries_total.inc();
+			SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
+				peer_id,
+				info:
+					IdentifyInfo {
+						protocol_version, agent_version, mut listen_addrs, protocols, ..
 					},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
+			}) => {
+				if listen_addrs.len() > 30 {
+					debug!(
+						target: "sub-libp2p",
+						"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
+						peer_id, protocol_version, agent_version
+					);
+					listen_addrs.truncate(30);
+				}
+				for addr in listen_addrs {
+					self.network_service
+						.behaviour_mut()
+						.add_self_reported_address_to_dht(&peer_id, &protocols, addr);
+				}
+				self.network_service
+					.behaviour_mut()
+					.user_protocol_mut()
+					.add_default_set_discovered_nodes(iter::once(peer_id));
+			},
+			SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
+				self.network_service
+					.behaviour_mut()
+					.user_protocol_mut()
+					.add_default_set_discovered_nodes(iter::once(peer_id));
+			},
+			SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) =>
+				if let Some(metrics) = self.metrics.as_ref() {
+					metrics.kademlia_random_queries_total.inc();
+				},
+			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
+				remote,
+				protocol,
+				negotiated_fallback,
+				notifications_sink,
+				role,
+			}) => {
+				if let Some(metrics) = self.metrics.as_ref() {
+					metrics
+						.notifications_streams_opened_total
+						.with_label_values(&[&protocol])
+						.inc();
+				}
+				{
+					let mut peers_notifications_sinks = self.peers_notifications_sinks.lock();
+					let _previous_value = peers_notifications_sinks
+						.insert((remote, protocol.clone()), notifications_sink);
+					debug_assert!(_previous_value.is_none());
+				}
+				self.event_streams.send(Event::NotificationStreamOpened {
 					remote,
 					protocol,
 					negotiated_fallback,
-					notifications_sink,
 					role,
-				})) => {
-					if let Some(metrics) = this.metrics.as_ref() {
-						metrics
-							.notifications_streams_opened_total
-							.with_label_values(&[&protocol])
-							.inc();
-					}
-					{
-						let mut peers_notifications_sinks = this.peers_notifications_sinks.lock();
-						let _previous_value = peers_notifications_sinks
-							.insert((remote, protocol.clone()), notifications_sink);
-						debug_assert!(_previous_value.is_none());
-					}
-					this.event_streams.send(Event::NotificationStreamOpened {
-						remote,
-						protocol,
-						negotiated_fallback,
-						role,
-					});
-				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
-					remote,
-					protocol,
-					notifications_sink,
-				})) => {
-					let mut peers_notifications_sinks = this.peers_notifications_sinks.lock();
-					if let Some(s) = peers_notifications_sinks.get_mut(&(remote, protocol)) {
-						*s = notifications_sink;
-					} else {
-						error!(
-							target: "sub-libp2p",
-							"NotificationStreamReplaced for non-existing substream"
-						);
-						debug_assert!(false);
-					}
+				});
+			},
+			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
+				remote,
+				protocol,
+				notifications_sink,
+			}) => {
+				let mut peers_notifications_sinks = self.peers_notifications_sinks.lock();
+				if let Some(s) = peers_notifications_sinks.get_mut(&(remote, protocol)) {
+					*s = notifications_sink;
+				} else {
+					error!(
+						target: "sub-libp2p",
+						"NotificationStreamReplaced for non-existing substream"
+					);
+					debug_assert!(false);
+				}
 
-					// TODO: Notifications might have been lost as a result of the previous
-					// connection being dropped, and as a result it would be preferable to notify
-					// the users of this fact by simulating the substream being closed then
-					// reopened.
-					// The code below doesn't compile because `role` is unknown. Propagating the
-					// handshake of the secondary connections is quite an invasive change and
-					// would conflict with https://github.com/paritytech/substrate/issues/6403.
-					// Considering that dropping notifications is generally regarded as
-					// acceptable, this bug is at the moment intentionally left there and is
-					// intended to be fixed at the same time as
-					// https://github.com/paritytech/substrate/issues/6403.
-					// this.event_streams.send(Event::NotificationStreamClosed {
-					// remote,
-					// protocol,
-					// });
-					// this.event_streams.send(Event::NotificationStreamOpened {
-					// remote,
-					// protocol,
-					// role,
-					// });
-				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed {
-					remote,
-					protocol,
-				})) => {
-					if let Some(metrics) = this.metrics.as_ref() {
-						metrics
-							.notifications_streams_closed_total
-							.with_label_values(&[&protocol[..]])
-							.inc();
-					}
-					this.event_streams.send(Event::NotificationStreamClosed {
-						remote,
-						protocol: protocol.clone(),
-					});
-					{
-						let mut peers_notifications_sinks = this.peers_notifications_sinks.lock();
-						let _previous_value = peers_notifications_sinks.remove(&(remote, protocol));
-						debug_assert!(_previous_value.is_some());
-					}
-				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
-					remote,
-					messages,
-				})) => {
-					if let Some(metrics) = this.metrics.as_ref() {
-						for (protocol, message) in &messages {
-							metrics
-								.notifications_sizes
-								.with_label_values(&["in", protocol])
-								.observe(message.len() as f64);
-						}
-					}
-					this.event_streams.send(Event::NotificationsReceived { remote, messages });
-				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::SyncConnected(remote))) => {
-					this.event_streams.send(Event::SyncConnected { remote });
-				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::SyncDisconnected(remote))) => {
-					this.event_streams.send(Event::SyncDisconnected { remote });
-				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration))) => {
-					if let Some(metrics) = this.metrics.as_ref() {
-						let query_type = match event {
-							DhtEvent::ValueFound(_) => "value-found",
-							DhtEvent::ValueNotFound(_) => "value-not-found",
-							DhtEvent::ValuePut(_) => "value-put",
-							DhtEvent::ValuePutFailed(_) => "value-put-failed",
-						};
+				// TODO: Notifications might have been lost as a result of the previous
+				// connection being dropped, and as a result it would be preferable to notify
+				// the users of this fact by simulating the substream being closed then
+				// reopened.
+				// The code below doesn't compile because `role` is unknown. Propagating the
+				// handshake of the secondary connections is quite an invasive change and
+				// would conflict with https://github.com/paritytech/substrate/issues/6403.
+				// Considering that dropping notifications is generally regarded as
+				// acceptable, this bug is at the moment intentionally left there and is
+				// intended to be fixed at the same time as
+				// https://github.com/paritytech/substrate/issues/6403.
+				// self.event_streams.send(Event::NotificationStreamClosed {
+				// remote,
+				// protocol,
+				// });
+				// self.event_streams.send(Event::NotificationStreamOpened {
+				// remote,
+				// protocol,
+				// role,
+				// });
+			},
+			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, protocol }) => {
+				if let Some(metrics) = self.metrics.as_ref() {
+					metrics
+						.notifications_streams_closed_total
+						.with_label_values(&[&protocol[..]])
+						.inc();
+				}
+				self.event_streams
+					.send(Event::NotificationStreamClosed { remote, protocol: protocol.clone() });
+				{
+					let mut peers_notifications_sinks = self.peers_notifications_sinks.lock();
+					let _previous_value = peers_notifications_sinks.remove(&(remote, protocol));
+					debug_assert!(_previous_value.is_some());
+				}
+			},
+			SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { remote, messages }) => {
+				if let Some(metrics) = self.metrics.as_ref() {
+					for (protocol, message) in &messages {
 						metrics
-							.kademlia_query_duration
-							.with_label_values(&[query_type])
-							.observe(duration.as_secs_f64());
+							.notifications_sizes
+							.with_label_values(&["in", protocol])
+							.observe(message.len() as f64);
 					}
+				}
+				self.event_streams.send(Event::NotificationsReceived { remote, messages });
+			},
+			SwarmEvent::Behaviour(BehaviourOut::SyncConnected(remote)) => {
+				self.event_streams.send(Event::SyncConnected { remote });
+			},
+			SwarmEvent::Behaviour(BehaviourOut::SyncDisconnected(remote)) => {
+				self.event_streams.send(Event::SyncDisconnected { remote });
+			},
+			SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
+				if let Some(metrics) = self.metrics.as_ref() {
+					let query_type = match event {
+						DhtEvent::ValueFound(_) => "value-found",
+						DhtEvent::ValueNotFound(_) => "value-not-found",
+						DhtEvent::ValuePut(_) => "value-put",
+						DhtEvent::ValuePutFailed(_) => "value-put-failed",
+					};
+					metrics
+						.kademlia_query_duration
+						.with_label_values(&[query_type])
+						.observe(duration.as_secs_f64());
+				}
 
-					this.event_streams.send(Event::Dht(event));
-				},
-				Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::None)) => {
-					// Ignored event from lower layers.
-				},
-				Poll::Ready(SwarmEvent::ConnectionEstablished {
-					peer_id,
-					endpoint,
-					num_established,
-					concurrent_dial_errors,
-				}) => {
-					if let Some(errors) = concurrent_dial_errors {
-						debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
-					} else {
-						debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
-					}
+				self.event_streams.send(Event::Dht(event));
+			},
+			SwarmEvent::Behaviour(BehaviourOut::None) => {
+				// Ignored event from lower layers.
+			},
+			SwarmEvent::ConnectionEstablished {
+				peer_id,
+				endpoint,
+				num_established,
+				concurrent_dial_errors,
+			} => {
+				if let Some(errors) = concurrent_dial_errors {
+					debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
+				} else {
+					debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
+				}
 
-					if let Some(metrics) = this.metrics.as_ref() {
-						let direction = match endpoint {
-							ConnectedPoint::Dialer { .. } => "out",
-							ConnectedPoint::Listener { .. } => "in",
-						};
-						metrics.connections_opened_total.with_label_values(&[direction]).inc();
+				if let Some(metrics) = self.metrics.as_ref() {
+					let direction = match endpoint {
+						ConnectedPoint::Dialer { .. } => "out",
+						ConnectedPoint::Listener { .. } => "in",
+					};
+					metrics.connections_opened_total.with_label_values(&[direction]).inc();
 
-						if num_established.get() == 1 {
-							metrics.distinct_peers_connections_opened_total.inc();
-						}
+					if num_established.get() == 1 {
+						metrics.distinct_peers_connections_opened_total.inc();
 					}
-				},
-				Poll::Ready(SwarmEvent::ConnectionClosed {
-					peer_id,
-					cause,
-					endpoint,
-					num_established,
-				}) => {
-					debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause);
-					if let Some(metrics) = this.metrics.as_ref() {
-						let direction = match endpoint {
-							ConnectedPoint::Dialer { .. } => "out",
-							ConnectedPoint::Listener { .. } => "in",
-						};
-						let reason = match cause {
-							Some(ConnectionError::IO(_)) => "transport-error",
-							Some(ConnectionError::Handler(EitherError::A(EitherError::A(
-								EitherError::B(EitherError::A(PingFailure::Timeout)),
-							)))) => "ping-timeout",
-							Some(ConnectionError::Handler(EitherError::A(EitherError::A(
-								EitherError::A(NotifsHandlerError::SyncNotificationsClogged),
-							)))) => "sync-notifications-clogged",
-							Some(ConnectionError::Handler(_)) => "protocol-error",
-							Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
-							None => "actively-closed",
-						};
-						metrics
-							.connections_closed_total
-							.with_label_values(&[direction, reason])
-							.inc();
+				}
+			},
+			SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, num_established } => {
+				debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause);
+				if let Some(metrics) = self.metrics.as_ref() {
+					let direction = match endpoint {
+						ConnectedPoint::Dialer { .. } => "out",
+						ConnectedPoint::Listener { .. } => "in",
+					};
+					let reason = match cause {
+						Some(ConnectionError::IO(_)) => "transport-error",
+						Some(ConnectionError::Handler(EitherError::A(EitherError::A(
+							EitherError::B(EitherError::A(PingFailure::Timeout)),
+						)))) => "ping-timeout",
+						Some(ConnectionError::Handler(EitherError::A(EitherError::A(
+							EitherError::A(NotifsHandlerError::SyncNotificationsClogged),
+						)))) => "sync-notifications-clogged",
+						Some(ConnectionError::Handler(_)) => "protocol-error",
+						Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
+						None => "actively-closed",
+					};
+					metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
 
-						// `num_established` represents the number of *remaining* connections.
-						if num_established == 0 {
-							metrics.distinct_peers_connections_closed_total.inc();
-						}
-					}
-				},
-				Poll::Ready(SwarmEvent::NewListenAddr { address, .. }) => {
-					trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address);
-					if let Some(metrics) = this.metrics.as_ref() {
-						metrics.listeners_local_addresses.inc();
-					}
-				},
-				Poll::Ready(SwarmEvent::ExpiredListenAddr { address, .. }) => {
-					info!(target: "sub-libp2p", "📪 No longer listening on {}", address);
-					if let Some(metrics) = this.metrics.as_ref() {
-						metrics.listeners_local_addresses.dec();
+					// `num_established` represents the number of *remaining* connections.
+					if num_established == 0 {
+						metrics.distinct_peers_connections_closed_total.inc();
 					}
-				},
-				Poll::Ready(SwarmEvent::OutgoingConnectionError { peer_id, error }) => {
-					if let Some(peer_id) = peer_id {
-						trace!(
-							target: "sub-libp2p",
-							"Libp2p => Failed to reach {:?}: {}",
-							peer_id, error,
-						);
-
-						if this.boot_node_ids.contains(&peer_id) {
-							if let DialError::WrongPeerId { obtained, endpoint } = &error {
-								if let ConnectedPoint::Dialer { address, role_override: _ } =
-									endpoint
-								{
-									warn!(
-										"💔 The bootnode you want to connect to at `{}` provided a different peer ID `{}` than the one you expect `{}`.",
-										address,
-										obtained,
-										peer_id,
-									);
-								}
+				}
+			},
+			SwarmEvent::NewListenAddr { address, .. } => {
+				trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address);
+				if let Some(metrics) = self.metrics.as_ref() {
+					metrics.listeners_local_addresses.inc();
+				}
+			},
+			SwarmEvent::ExpiredListenAddr { address, .. } => {
+				info!(target: "sub-libp2p", "📪 No longer listening on {}", address);
+				if let Some(metrics) = self.metrics.as_ref() {
+					metrics.listeners_local_addresses.dec();
+				}
+			},
+			SwarmEvent::OutgoingConnectionError { peer_id, error } => {
+				if let Some(peer_id) = peer_id {
+					trace!(
+						target: "sub-libp2p",
+						"Libp2p => Failed to reach {:?}: {}",
+						peer_id, error,
+					);
+
+					if self.boot_node_ids.contains(&peer_id) {
+						if let DialError::WrongPeerId { obtained, endpoint } = &error {
+							if let ConnectedPoint::Dialer { address, role_override: _ } = endpoint {
+								warn!(
+									"💔 The bootnode you want to connect to at `{}` provided a different peer ID `{}` than the one you expect `{}`.",
+									address,
+									obtained,
+									peer_id,
+								);
 							}
 						}
 					}
+				}
 
-					if let Some(metrics) = this.metrics.as_ref() {
-						let reason = match error {
-							DialError::ConnectionLimit(_) => Some("limit-reached"),
-							DialError::InvalidPeerId(_) => Some("invalid-peer-id"),
-							DialError::Transport(_) | DialError::ConnectionIo(_) =>
-								Some("transport-error"),
-							DialError::Banned |
-							DialError::LocalPeerId |
-							DialError::NoAddresses |
-							DialError::DialPeerConditionFalse(_) |
-							DialError::WrongPeerId { .. } |
-							DialError::Aborted => None, // ignore them
-						};
-						if let Some(reason) = reason {
-							metrics
-								.pending_connections_errors_total
-								.with_label_values(&[reason])
-								.inc();
-						}
-					}
-				},
-				Poll::Ready(SwarmEvent::Dialing(peer_id)) => {
-					trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id)
-				},
-				Poll::Ready(SwarmEvent::IncomingConnection { local_addr, send_back_addr }) => {
-					trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))",
-						local_addr, send_back_addr);
-					if let Some(metrics) = this.metrics.as_ref() {
-						metrics.incoming_connections_total.inc();
-					}
-				},
-				Poll::Ready(SwarmEvent::IncomingConnectionError {
-					local_addr,
-					send_back_addr,
-					error,
-				}) => {
-					debug!(
-						target: "sub-libp2p",
-						"Libp2p => IncomingConnectionError({},{}): {}",
-						local_addr, send_back_addr, error,
-					);
-					if let Some(metrics) = this.metrics.as_ref() {
-						let reason = match error {
-							PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"),
-							PendingConnectionError::WrongPeerId { .. } => Some("invalid-peer-id"),
-							PendingConnectionError::Transport(_) |
-							PendingConnectionError::IO(_) => Some("transport-error"),
-							PendingConnectionError::Aborted => None, // ignore it
-						};
-
-						if let Some(reason) = reason {
-							metrics
-								.incoming_connections_errors_total
-								.with_label_values(&[reason])
-								.inc();
-						}
+				if let Some(metrics) = self.metrics.as_ref() {
+					let reason = match error {
+						DialError::ConnectionLimit(_) => Some("limit-reached"),
+						DialError::InvalidPeerId(_) => Some("invalid-peer-id"),
+						DialError::Transport(_) | DialError::ConnectionIo(_) =>
+							Some("transport-error"),
+						DialError::Banned |
+						DialError::LocalPeerId |
+						DialError::NoAddresses |
+						DialError::DialPeerConditionFalse(_) |
+						DialError::WrongPeerId { .. } |
+						DialError::Aborted => None, // ignore them
+					};
+					if let Some(reason) = reason {
+						metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
 					}
-				},
-				Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => {
-					debug!(
-						target: "sub-libp2p",
-						"Libp2p => BannedPeer({}). Connected via {:?}.",
-						peer_id, endpoint,
-					);
-					if let Some(metrics) = this.metrics.as_ref() {
+				}
+			},
+			SwarmEvent::Dialing(peer_id) => {
+				trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id)
+			},
+			SwarmEvent::IncomingConnection { local_addr, send_back_addr } => {
+				trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))",
+					local_addr, send_back_addr);
+				if let Some(metrics) = self.metrics.as_ref() {
+					metrics.incoming_connections_total.inc();
+				}
+			},
+			SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error } => {
+				debug!(
+					target: "sub-libp2p",
+					"Libp2p => IncomingConnectionError({},{}): {}",
+					local_addr, send_back_addr, error,
+				);
+				if let Some(metrics) = self.metrics.as_ref() {
+					let reason = match error {
+						PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"),
+						PendingConnectionError::WrongPeerId { .. } => Some("invalid-peer-id"),
+						PendingConnectionError::Transport(_) | PendingConnectionError::IO(_) =>
+							Some("transport-error"),
+						PendingConnectionError::Aborted => None, // ignore it
+					};
+
+					if let Some(reason) = reason {
 						metrics
 							.incoming_connections_errors_total
-							.with_label_values(&["banned"])
+							.with_label_values(&[reason])
 							.inc();
 					}
-				},
-				Poll::Ready(SwarmEvent::ListenerClosed { reason, addresses, .. }) => {
-					if let Some(metrics) = this.metrics.as_ref() {
-						metrics.listeners_local_addresses.sub(addresses.len() as u64);
-					}
-					let addrs =
-						addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
-					match reason {
-						Ok(()) => error!(
-							target: "sub-libp2p",
-							"📪 Libp2p listener ({}) closed gracefully",
-							addrs
-						),
-						Err(e) => error!(
-							target: "sub-libp2p",
-							"📪 Libp2p listener ({}) closed: {}",
-							addrs, e
-						),
-					}
-				},
-				Poll::Ready(SwarmEvent::ListenerError { error, .. }) => {
-					debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error);
-					if let Some(metrics) = this.metrics.as_ref() {
-						metrics.listeners_errors_total.inc();
-					}
-				},
-			};
-		}
-
-		let num_connected_peers =
-			this.network_service.behaviour_mut().user_protocol_mut().num_connected_peers();
-
-		// Update the variables shared with the `NetworkService`.
-		this.num_connected.store(num_connected_peers, Ordering::Relaxed);
-		{
-			let external_addresses =
-				Swarm::<Behaviour<B, Client>>::external_addresses(&this.network_service)
-					.map(|r| &r.addr)
-					.cloned()
-					.collect();
-			*this.external_addresses.lock() = external_addresses;
-		}
-
-		let is_major_syncing = this
-			.network_service
-			.behaviour_mut()
-			.user_protocol_mut()
-			.sync_state()
-			.state
-			.is_major_syncing();
-
-		this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
-
-		if let Some(metrics) = this.metrics.as_ref() {
-			if let Some(buckets) = this.network_service.behaviour_mut().num_entries_per_kbucket() {
-				for (lower_ilog2_bucket_bound, num_entries) in buckets {
-					metrics
-						.kbuckets_num_nodes
-						.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
-						.set(num_entries as u64);
 				}
-			}
-			if let Some(num_entries) = this.network_service.behaviour_mut().num_kademlia_records() {
-				metrics.kademlia_records_count.set(num_entries as u64);
-			}
-			if let Some(num_entries) =
-				this.network_service.behaviour_mut().kademlia_records_total_size()
-			{
-				metrics.kademlia_records_sizes_total.set(num_entries as u64);
-			}
-			metrics
-				.peerset_num_discovered
-				.set(this.network_service.behaviour_mut().user_protocol().num_discovered_peers()
-					as u64);
-			metrics.pending_connections.set(
-				Swarm::network_info(&this.network_service).connection_counters().num_pending()
-					as u64,
-			);
+			},
+			SwarmEvent::BannedPeer { peer_id, endpoint } => {
+				debug!(
+					target: "sub-libp2p",
+					"Libp2p => BannedPeer({}). Connected via {:?}.",
+					peer_id, endpoint,
+				);
+				if let Some(metrics) = self.metrics.as_ref() {
+					metrics.incoming_connections_errors_total.with_label_values(&["banned"]).inc();
+				}
+			},
+			SwarmEvent::ListenerClosed { reason, addresses, .. } => {
+				if let Some(metrics) = self.metrics.as_ref() {
+					metrics.listeners_local_addresses.sub(addresses.len() as u64);
+				}
+				let addrs =
+					addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
+				match reason {
+					Ok(()) => error!(
+						target: "sub-libp2p",
+						"📪 Libp2p listener ({}) closed gracefully",
+						addrs
+					),
+					Err(e) => error!(
+						target: "sub-libp2p",
+						"📪 Libp2p listener ({}) closed: {}",
+						addrs, e
+					),
+				}
+			},
+			SwarmEvent::ListenerError { error, .. } => {
+				debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error);
+				if let Some(metrics) = self.metrics.as_ref() {
+					metrics.listeners_errors_total.inc();
+				}
+			},
 		}
-
-		Poll::Pending
 	}
 }
 
diff --git a/substrate/client/network/src/service/tests/chain_sync.rs b/substrate/client/network/src/service/tests/chain_sync.rs
index 9d8463ff190..1ae432fd4c2 100644
--- a/substrate/client/network/src/service/tests/chain_sync.rs
+++ b/substrate/client/network/src/service/tests/chain_sync.rs
@@ -75,12 +75,8 @@ async fn normal_network_poll_no_peers() {
 		.with_chain_sync((chain_sync, chain_sync_service))
 		.build();
 
-	// poll the network once
-	futures::future::poll_fn(|cx| {
-		let _ = network.network().poll_unpin(cx);
-		Poll::Ready(())
-	})
-	.await;
+	// perform one action on network
+	let _ = network.network().next_action().await;
 }
 
 #[tokio::test]
@@ -110,11 +106,8 @@ async fn request_justification() {
 	// send "request justifiction" message and poll the network
 	network.service().request_justification(&hash, number);
 
-	futures::future::poll_fn(|cx| {
-		let _ = network.network().poll_unpin(cx);
-		Poll::Ready(())
-	})
-	.await;
+	// perform one action on network
+	let _ = network.network().next_action().await;
 }
 
 #[tokio::test]
@@ -141,11 +134,8 @@ async fn clear_justification_requests() {
 	// send "request justifiction" message and poll the network
 	network.service().clear_justification_requests();
 
-	futures::future::poll_fn(|cx| {
-		let _ = network.network().poll_unpin(cx);
-		Poll::Ready(())
-	})
-	.await;
+	// perform one action on network
+	let _ = network.network().next_action().await;
 }
 
 #[tokio::test]
@@ -180,11 +170,8 @@ async fn set_sync_fork_request() {
 	// send "set sync fork request" message and poll the network
 	network.service().set_sync_fork_request(copy_peers, hash, number);
 
-	futures::future::poll_fn(|cx| {
-		let _ = network.network().poll_unpin(cx);
-		Poll::Ready(())
-	})
-	.await;
+	// perform one action on network
+	let _ = network.network().next_action().await;
 }
 
 #[tokio::test]
@@ -225,11 +212,8 @@ async fn on_block_finalized() {
 	// send "set sync fork request" message and poll the network
 	network.network().on_block_finalized(hash, header);
 
-	futures::future::poll_fn(|cx| {
-		let _ = network.network().poll_unpin(cx);
-		Poll::Ready(())
-	})
-	.await;
+	// perform one action on network
+	let _ = network.network().next_action().await;
 }
 
 // report from mock import queue that importing a justification was not successful
diff --git a/substrate/client/network/src/service/tests/mod.rs b/substrate/client/network/src/service/tests/mod.rs
index 9c97a7f7383..3ce139ff03a 100644
--- a/substrate/client/network/src/service/tests/mod.rs
+++ b/substrate/client/network/src/service/tests/mod.rs
@@ -80,10 +80,7 @@ impl TestNetwork {
 		let service = worker.service().clone();
 		let event_stream = service.event_stream("test");
 
-		tokio::spawn(async move {
-			futures::pin_mut!(worker);
-			let _ = worker.await;
-		});
+		tokio::spawn(worker.run());
 
 		(service, event_stream)
 	}
diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs
index ffc0edaf31e..f56ef13fc90 100644
--- a/substrate/client/network/sync/src/lib.rs
+++ b/substrate/client/network/sync/src/lib.rs
@@ -1358,6 +1358,12 @@ where
 						);
 					}
 				},
+				ToServiceCommand::BlockFinalized(hash, number) => {
+					self.on_block_finalized(&hash, number);
+				},
+				ToServiceCommand::Status { pending_response } => {
+					let _ = pending_response.send(self.status());
+				},
 			}
 		}
 
diff --git a/substrate/client/network/sync/src/service/chain_sync.rs b/substrate/client/network/sync/src/service/chain_sync.rs
index 50ded5b643d..824303ec09d 100644
--- a/substrate/client/network/sync/src/service/chain_sync.rs
+++ b/substrate/client/network/sync/src/service/chain_sync.rs
@@ -16,9 +16,10 @@
 // You should have received a copy of the GNU General Public License
 // along with this program. If not, see <https://www.gnu.org/licenses/>.
 
+use futures::channel::oneshot;
 use libp2p::PeerId;
 use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link};
-use sc_network_common::service::NetworkSyncForkRequest;
+use sc_network_common::{service::NetworkSyncForkRequest, sync::SyncStatus};
 use sc_utils::mpsc::TracingUnboundedSender;
 use sp_runtime::traits::{Block as BlockT, NumberFor};
 
@@ -34,6 +35,10 @@ pub enum ToServiceCommand<B: BlockT> {
 		Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
 	),
 	JustificationImported(PeerId, B::Hash, NumberFor<B>, bool),
+	BlockFinalized(B::Hash, NumberFor<B>),
+	Status {
+		pending_response: oneshot::Sender<SyncStatus<B>>,
+	},
 }
 
 /// Handle for communicating with `ChainSync` asynchronously
@@ -47,6 +52,21 @@ impl<B: BlockT> ChainSyncInterfaceHandle<B> {
 	pub fn new(tx: TracingUnboundedSender<ToServiceCommand<B>>) -> Self {
 		Self { tx }
 	}
+
+	/// Notify ChainSync about finalized block
+	pub fn on_block_finalized(&self, hash: B::Hash, number: NumberFor<B>) {
+		let _ = self.tx.unbounded_send(ToServiceCommand::BlockFinalized(hash, number));
+	}
+
+	/// Get sync status
+	///
+	/// Returns an error if `ChainSync` has terminated.
+	pub async fn status(&self) -> Result<SyncStatus<B>, ()> {
+		let (tx, rx) = oneshot::channel();
+		let _ = self.tx.unbounded_send(ToServiceCommand::Status { pending_response: tx });
+
+		rx.await.map_err(|_| ())
+	}
 }
 
 impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>>
diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs
index ccaebc97613..c47e3c86f5c 100644
--- a/substrate/client/network/test/src/lib.rs
+++ b/substrate/client/network/test/src/lib.rs
@@ -31,7 +31,7 @@ use std::{
 	time::Duration,
 };
 
-use futures::{channel::oneshot, future::BoxFuture, prelude::*};
+use futures::{channel::oneshot, future::BoxFuture, pin_mut, prelude::*};
 use libp2p::{build_multiaddr, PeerId};
 use log::trace;
 use parking_lot::Mutex;
@@ -83,7 +83,7 @@ use sp_runtime::{
 };
 use substrate_test_runtime_client::AccountKeyring;
 pub use substrate_test_runtime_client::{
-	runtime::{Block, Extrinsic, Hash, Transfer},
+	runtime::{Block, Extrinsic, Hash, Header, Transfer},
 	TestClient, TestClientBuilder, TestClientBuilderExt,
 };
 use tokio::time::timeout;
@@ -1078,8 +1078,17 @@ where
 		self.mut_peers(|peers| {
 			for (i, peer) in peers.iter_mut().enumerate() {
 				trace!(target: "sync", "-- Polling {}: {}", i, peer.id());
-				if let Poll::Ready(()) = peer.network.poll_unpin(cx) {
-					panic!("NetworkWorker has terminated unexpectedly.")
+				loop {
+					// The code below is not quite correct, because we are polling a different
+					// instance of the future every time. But as long as
+					// `NetworkWorker::next_action()` contains just streams polling not interleaved
+					// with other `.await`s, dropping the future and recreating it works the same as
+					// polling a single instance.
+					let net_poll_future = peer.network.next_action();
+					pin_mut!(net_poll_future);
+					if let Poll::Pending = net_poll_future.poll(cx) {
+						break
+					}
 				}
 				trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id());
 
diff --git a/substrate/client/offchain/src/api.rs b/substrate/client/offchain/src/api.rs
index 1301ce9fd96..cd9f5c8f6fe 100644
--- a/substrate/client/offchain/src/api.rs
+++ b/substrate/client/offchain/src/api.rs
@@ -419,6 +419,10 @@ mod tests {
 		fn local_peer_id(&self) -> PeerId {
 			PeerId::random()
 		}
+
+		fn listen_addresses(&self) -> Vec<Multiaddr> {
+			Vec::new()
+		}
 	}
 
 	fn offchain_api() -> (Api, AsyncApi) {
diff --git a/substrate/client/offchain/src/lib.rs b/substrate/client/offchain/src/lib.rs
index 6b28d3f8a48..6cf5838a46b 100644
--- a/substrate/client/offchain/src/lib.rs
+++ b/substrate/client/offchain/src/lib.rs
@@ -270,6 +270,10 @@ mod tests {
 		fn local_peer_id(&self) -> PeerId {
 			PeerId::random()
 		}
+
+		fn listen_addresses(&self) -> Vec<Multiaddr> {
+			Vec::new()
+		}
 	}
 
 	impl NetworkPeers for TestNetwork {
diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs
index a737601f71b..fb80753b4cb 100644
--- a/substrate/client/service/src/builder.rs
+++ b/substrate/client/service/src/builder.rs
@@ -17,7 +17,7 @@
 // along with this program. If not, see <https://www.gnu.org/licenses/>.
 
 use crate::{
-	build_network_future,
+	build_network_future, build_system_rpc_future,
 	client::{Client, ClientConfig},
 	config::{Configuration, KeystoreConfig, PrometheusConfig},
 	error::Error,
@@ -963,19 +963,29 @@ where
 		Some("networking"),
 		chain_sync_network_provider.run(network.clone()),
 	);
-	spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service)));
+	spawn_handle.spawn(
+		"import-queue",
+		None,
+		import_queue.run(Box::new(chain_sync_service.clone())),
+	);
 
 	let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
-
-	let future = build_network_future(
-		config.role.clone(),
-		network_mut,
-		client,
-		system_rpc_rx,
-		has_bootnodes,
-		config.announce_block,
+	spawn_handle.spawn(
+		"system-rpc-handler",
+		Some("networking"),
+		build_system_rpc_future(
+			config.role.clone(),
+			network_mut.service().clone(),
+			chain_sync_service.clone(),
+			client.clone(),
+			system_rpc_rx,
+			has_bootnodes,
+		),
 	);
 
+	let future =
+		build_network_future(network_mut, client, chain_sync_service, config.announce_block);
+
 	// TODO: Normally, one is supposed to pass a list of notifications protocols supported by the
 	// node through the `NetworkConfiguration` struct. But because this function doesn't know in
 	// advance which components, such as GrandPa or Polkadot, will be plugged on top of the
diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs
index e3dcf012892..253479abc3b 100644
--- a/substrate/client/service/src/lib.rs
+++ b/substrate/client/service/src/lib.rs
@@ -37,12 +37,16 @@ mod task_manager;
 use std::{collections::HashMap, net::SocketAddr};
 
 use codec::{Decode, Encode};
-use futures::{channel::mpsc, FutureExt, StreamExt};
+use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
 use jsonrpsee::{core::Error as JsonRpseeError, RpcModule};
 use log::{debug, error, warn};
 use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
-use sc_network::PeerId;
-use sc_network_common::{config::MultiaddrWithPeerId, service::NetworkBlock};
+use sc_network::{NetworkStateInfo, PeerId};
+use sc_network_common::{
+	config::MultiaddrWithPeerId,
+	service::{NetworkBlock, NetworkPeers},
+};
+use sc_network_sync::service::chain_sync::ChainSyncInterfaceHandle;
 use sc_utils::mpsc::TracingUnboundedReceiver;
 use sp_blockchain::HeaderMetadata;
 use sp_consensus::SyncOracle;
@@ -138,9 +142,7 @@ pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, Transact
 	pub other: Other,
 }
 
-/// Builds a never-ending future that continuously polls the network.
-///
-/// The `status_sink` contain a list of senders to send a periodic network status to.
+/// Builds a future that continuously polls the network.
 async fn build_network_future<
 	B: BlockT,
 	C: BlockchainEvents<B>
@@ -153,21 +155,21 @@ async fn build_network_future<
 		+ 'static,
 	H: sc_network_common::ExHashT,
 >(
-	role: Role,
-	mut network: sc_network::NetworkWorker<B, H, C>,
+	network: sc_network::NetworkWorker<B, H, C>,
 	client: Arc<C>,
-	mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
-	should_have_peers: bool,
+	chain_sync_service: ChainSyncInterfaceHandle<B>,
 	announce_imported_blocks: bool,
 ) {
 	let mut imported_blocks_stream = client.import_notification_stream().fuse();
 
-	// Current best block at initialization, to report to the RPC layer.
-	let starting_block = client.info().best_number;
-
 	// Stream of finalized blocks reported by the client.
 	let mut finality_notification_stream = client.finality_notification_stream().fuse();
 
+	let network_service = network.service().clone();
+
+	let network_run = network.run().fuse();
+	pin_mut!(network_run);
+
 	loop {
 		futures::select! {
 			// List of blocks that the client has imported.
@@ -176,15 +178,18 @@ async fn build_network_future<
 					Some(n) => n,
 					// If this stream is shut down, that means the client has shut down, and the
 					// most appropriate thing to do for the network future is to shut down too.
-					None => return,
+					None => {
+						debug!("Block import stream has terminated, shutting down the network future.");
+						return
+					},
 				};
 
 				if announce_imported_blocks {
-					network.service().announce_block(notification.hash, None);
+					network_service.announce_block(notification.hash, None);
 				}
 
 				if notification.is_new_best {
-					network.service().new_best_block_imported(
+					network_service.new_best_block_imported(
 						notification.hash,
 						*notification.header.number(),
 					);
@@ -193,106 +198,160 @@ async fn build_network_future<
 
 			// List of blocks that the client has finalized.
 			notification = finality_notification_stream.select_next_some() => {
-				network.on_block_finalized(notification.hash, notification.header);
+				chain_sync_service.on_block_finalized(notification.hash, *notification.header.number());
 			}
 
-			// Answer incoming RPC requests.
-			request = rpc_rx.select_next_some() => {
-				match request {
-					sc_rpc::system::Request::Health(sender) => {
-						let _ = sender.send(sc_rpc::system::Health {
-							peers: network.peers_debug_info().len(),
-							is_syncing: network.service().is_major_syncing(),
-							should_have_peers,
-						});
-					},
-					sc_rpc::system::Request::LocalPeerId(sender) => {
-						let _ = sender.send(network.local_peer_id().to_base58());
-					},
-					sc_rpc::system::Request::LocalListenAddresses(sender) => {
-						let peer_id = (*network.local_peer_id()).into();
-						let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id);
-						let addresses = network.listen_addresses()
-							.map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string())
-							.collect();
-						let _ = sender.send(addresses);
-					},
-					sc_rpc::system::Request::Peers(sender) => {
-						let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)|
-							sc_rpc::system::PeerInfo {
+			// Drive the network. Shut down the network future if `NetworkWorker` has terminated.
+			_ = network_run => {
+				debug!("`NetworkWorker` has terminated, shutting down the network future.");
+				return
+			}
+		}
+	}
+}
+
+/// Builds a future that processes system RPC requests.
+async fn build_system_rpc_future<
+	B: BlockT,
+	C: BlockchainEvents<B>
+		+ HeaderBackend<B>
+		+ BlockBackend<B>
+		+ HeaderMetadata<B, Error = sp_blockchain::Error>
+		+ ProofProvider<B>
+		+ Send
+		+ Sync
+		+ 'static,
+	H: sc_network_common::ExHashT,
+>(
+	role: Role,
+	network_service: Arc<sc_network::NetworkService<B, H>>,
+	chain_sync_service: ChainSyncInterfaceHandle<B>,
+	client: Arc<C>,
+	mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
+	should_have_peers: bool,
+) {
+	// Current best block at initialization, to report to the RPC layer.
+	let starting_block = client.info().best_number;
+
+	loop {
+		// Answer incoming RPC requests.
+		let Some(req) = rpc_rx.next().await else {
+			debug!("RPC requests stream has terminated, shutting down the system RPC future.");
+			return;
+		};
+
+		match req {
+			sc_rpc::system::Request::Health(sender) => {
+				let peers = network_service.peers_debug_info().await;
+				if let Ok(peers) = peers {
+					let _ = sender.send(sc_rpc::system::Health {
+						peers: peers.len(),
+						is_syncing: network_service.is_major_syncing(),
+						should_have_peers,
+					});
+				} else {
+					break
+				}
+			},
+			sc_rpc::system::Request::LocalPeerId(sender) => {
+				let _ = sender.send(network_service.local_peer_id().to_base58());
+			},
+			sc_rpc::system::Request::LocalListenAddresses(sender) => {
+				let peer_id = network_service.local_peer_id().into();
+				let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id);
+				let addresses = network_service
+					.listen_addresses()
+					.iter()
+					.map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string())
+					.collect();
+				let _ = sender.send(addresses);
+			},
+			sc_rpc::system::Request::Peers(sender) => {
+				let peers = network_service.peers_debug_info().await;
+				if let Ok(peers) = peers {
+					let _ = sender.send(
+						peers
+							.into_iter()
+							.map(|(peer_id, p)| sc_rpc::system::PeerInfo {
 								peer_id: peer_id.to_base58(),
 								roles: format!("{:?}", p.roles),
 								best_hash: p.best_hash,
 								best_number: p.best_number,
-							}
-						).collect());
-					}
-					sc_rpc::system::Request::NetworkState(sender) => {
-						if let Ok(network_state) = serde_json::to_value(&network.network_state()) {
-							let _ = sender.send(network_state);
-						}
-					}
-					sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => {
-						let result = match MultiaddrWithPeerId::try_from(peer_addr) {
-							Ok(peer) => {
-								network.add_reserved_peer(peer)
-							},
-							Err(err) => {
-								Err(err.to_string())
-							},
-						};
-						let x = result.map_err(sc_rpc::system::error::Error::MalformattedPeerArg);
-						let _ = sender.send(x);
-					}
-					sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => {
-						let _ = match peer_id.parse::<PeerId>() {
-							Ok(peer_id) => {
-								network.remove_reserved_peer(peer_id);
-								sender.send(Ok(()))
-							}
-							Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg(
-								e.to_string(),
-							))),
-						};
-					}
-					sc_rpc::system::Request::NetworkReservedPeers(sender) => {
-						let reserved_peers = network.reserved_peers();
-						let reserved_peers = reserved_peers
-							.map(|peer_id| peer_id.to_base58())
-							.collect();
-
-						let _ = sender.send(reserved_peers);
+							})
+							.collect(),
+					);
+				} else {
+					break
+				}
+			},
+			sc_rpc::system::Request::NetworkState(sender) => {
+				let network_state = network_service.network_state().await;
+				if let Ok(network_state) = network_state {
+					if let Ok(network_state) = serde_json::to_value(network_state) {
+						let _ = sender.send(network_state);
 					}
-					sc_rpc::system::Request::NodeRoles(sender) => {
-						use sc_rpc::system::NodeRole;
+				} else {
+					break
+				}
+			},
+			sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => {
+				let result = match MultiaddrWithPeerId::try_from(peer_addr) {
+					Ok(peer) => network_service.add_reserved_peer(peer),
+					Err(err) => Err(err.to_string()),
+				};
+				let x = result.map_err(sc_rpc::system::error::Error::MalformattedPeerArg);
+				let _ = sender.send(x);
+			},
+			sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => {
+				let _ = match peer_id.parse::<PeerId>() {
+					Ok(peer_id) => {
+						network_service.remove_reserved_peer(peer_id);
+						sender.send(Ok(()))
+					},
+					Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg(
+						e.to_string(),
+					))),
+				};
+			},
+			sc_rpc::system::Request::NetworkReservedPeers(sender) => {
+				let reserved_peers = network_service.reserved_peers().await;
+				if let Ok(reserved_peers) = reserved_peers {
+					let reserved_peers =
+						reserved_peers.iter().map(|peer_id| peer_id.to_base58()).collect();
+					let _ = sender.send(reserved_peers);
+				} else {
+					break
+				}
+			},
+			sc_rpc::system::Request::NodeRoles(sender) => {
+				use sc_rpc::system::NodeRole;
 
-						let node_role = match role {
-							Role::Authority { .. } => NodeRole::Authority,
-							Role::Full => NodeRole::Full,
-						};
+				let node_role = match role {
+					Role::Authority { .. } => NodeRole::Authority,
+					Role::Full => NodeRole::Full,
+				};
 
-						let _ = sender.send(vec![node_role]);
-					}
-					sc_rpc::system::Request::SyncState(sender) => {
-						use sc_rpc::system::SyncState;
+				let _ = sender.send(vec![node_role]);
+			},
+			sc_rpc::system::Request::SyncState(sender) => {
+				use sc_rpc::system::SyncState;
 
-						let best_number = client.info().best_number;
+				let best_number = client.info().best_number;
 
-						let _ = sender.send(SyncState {
-							starting_block,
-							current_block: best_number,
-							highest_block: network.best_seen_block().unwrap_or(best_number),
-						});
-					}
-				}
-			}
+				let Ok(status) = chain_sync_service.status().await else {
+					debug!("`ChainSync` has terminated, shutting down the system RPC future.");
+					return
+				};
 
-			// The network worker has done something. Nothing special to do, but could be
-			// used in the future to perform actions in response of things that happened on
-			// the network.
-			_ = (&mut network).fuse() => {}
+				let _ = sender.send(SyncState {
+					starting_block,
+					current_block: best_number,
+					highest_block: status.best_seen_block.unwrap_or(best_number),
+				});
+			},
 		}
 	}
+	debug!("`NetworkWorker` has terminated, shutting down the system RPC future.");
 }
 
 // Wrapper for HTTP and WS servers that makes sure they are properly shut down.
-- 
GitLab