From 44dbb73945a20fa16ed004d50827c63c0921cbd0 Mon Sep 17 00:00:00 2001
From: Vsevolod Stakhov <vsevolod.stakhov@parity.io>
Date: Mon, 11 Sep 2023 19:33:51 +0100
Subject: [PATCH] Allow to broadcast network messages in parallel (#1409)

This PR addresses multiple issues pending:

* [x] Update orchestra to the recent version and test how the node
performs
* [x] Add some useful metrics for outbound network bridge
* [x] Try to send incoming network requests to all subsystems without
blocking on some particular subsystem in that loop
* [x] Fix all incompatibilities between orchestra and polkadot code
(e.g. malus node)
---
 Cargo.lock                                    | 41 +++++------
 .../Cargo.toml                                |  2 +-
 polkadot/node/malus/src/interceptor.rs        | 62 ++++++++++++++---
 polkadot/node/malus/src/variants/common.rs    |  7 --
 polkadot/node/metrics/Cargo.toml              |  3 +-
 polkadot/node/network/bridge/src/metrics.rs   | 53 ++++++++++++++
 polkadot/node/network/bridge/src/network.rs   |  1 +
 polkadot/node/network/bridge/src/rx/mod.rs    | 69 ++++++++++++++++---
 polkadot/node/network/bridge/src/tx/mod.rs    | 15 ++++
 polkadot/node/overseer/Cargo.toml             |  9 +--
 polkadot/node/overseer/src/lib.rs             |  2 +-
 polkadot/node/overseer/src/tests.rs           |  1 +
 .../node/subsystem-test-helpers/src/lib.rs    | 10 ++-
 polkadot/node/subsystem-types/Cargo.toml      |  2 +-
 polkadot/node/subsystem-util/Cargo.toml       |  2 +-
 15 files changed, 223 insertions(+), 56 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 0623d96ef1c..686ee62add3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4841,18 +4841,6 @@ dependencies = [
  "quote",
 ]
 
-[[package]]
-name = "expander"
-version = "0.0.6"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3774182a5df13c3d1690311ad32fbe913feef26baba609fa2dd5f72042bd2ab6"
-dependencies = [
- "blake2",
- "fs-err",
- "proc-macro2",
- "quote",
-]
-
 [[package]]
 name = "expander"
 version = "2.0.0"
@@ -6896,6 +6884,15 @@ dependencies = [
  "thiserror",
 ]
 
+[[package]]
+name = "layout-rs"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1164ef87cb9607c2d887216eca79f0fc92895affe1789bba805dd38d829584e0"
+dependencies = [
+ "log",
+]
+
 [[package]]
 name = "lazy_static"
 version = "1.4.0"
@@ -8692,9 +8689,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
 
 [[package]]
 name = "orchestra"
-version = "0.0.5"
+version = "0.3.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "227585216d05ba65c7ab0a0450a3cf2cbd81a98862a54c4df8e14d5ac6adb015"
+checksum = "46d78e1deb2a8d54fc1f063a544130db4da31dfe4d5d3b493186424910222a76"
 dependencies = [
  "async-trait",
  "dyn-clonable",
@@ -8709,12 +8706,16 @@ dependencies = [
 
 [[package]]
 name = "orchestra-proc-macro"
-version = "0.0.5"
+version = "0.3.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2871aadd82a2c216ee68a69837a526dfe788ecbe74c4c5038a6acdbff6653066"
+checksum = "d035b1f968d91a826f2e34a9d6d02cb2af5aa7ca39ebd27922d850ab4b2dd2c6"
 dependencies = [
- "expander 0.0.6",
- "itertools 0.10.5",
+ "anyhow",
+ "expander 2.0.0",
+ "fs-err",
+ "indexmap 2.0.0",
+ "itertools 0.11.0",
+ "layout-rs",
  "petgraph",
  "proc-macro-crate",
  "proc-macro2",
@@ -13310,9 +13311,9 @@ dependencies = [
 
 [[package]]
 name = "prioritized-metered-channel"
-version = "0.2.0"
+version = "0.5.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "382698e48a268c832d0b181ed438374a6bb708a82a8ca273bb0f61c74cf209c4"
+checksum = "e99f0c89bd88f393aab44a4ab949351f7bc7e7e1179d11ecbfe50cbe4c47e342"
 dependencies = [
  "coarsetime",
  "crossbeam-queue",
diff --git a/cumulus/client/relay-chain-inprocess-interface/Cargo.toml b/cumulus/client/relay-chain-inprocess-interface/Cargo.toml
index c198b22cad4..39eda5075e2 100644
--- a/cumulus/client/relay-chain-inprocess-interface/Cargo.toml
+++ b/cumulus/client/relay-chain-inprocess-interface/Cargo.toml
@@ -37,7 +37,7 @@ sp-keyring = { path = "../../../substrate/primitives/keyring" }
 # Polkadot
 polkadot-primitives = { path = "../../../polkadot/primitives" }
 polkadot-test-client = { path = "../../../polkadot/node/test/client" }
-metered = { package = "prioritized-metered-channel", version = "0.2.0" }
+metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }
 
 # Cumulus
 cumulus-test-service = { path = "../../test/service" }
diff --git a/polkadot/node/malus/src/interceptor.rs b/polkadot/node/malus/src/interceptor.rs
index cbf39bccd16..04ee0905dee 100644
--- a/polkadot/node/malus/src/interceptor.rs
+++ b/polkadot/node/malus/src/interceptor.rs
@@ -47,12 +47,20 @@ where
 		Some(msg)
 	}
 
-	/// Modify outgoing messages.
+	/// Specifies if we need to replace some outgoing message with another (potentially empty)
+	/// message
+	fn need_intercept_outgoing(
+		&self,
+		_msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
+	) -> bool {
+		false
+	}
+	/// Send modified message instead of the original one
 	fn intercept_outgoing(
 		&self,
-		msg: <Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
+		_msg: &<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages,
 	) -> Option<<Self::Message as overseer::AssociateOutgoing>::OutgoingMessages> {
-		Some(msg)
+		None
 	}
 }
 
@@ -66,7 +74,7 @@ pub struct InterceptedSender<Sender, Fil> {
 #[async_trait::async_trait]
 impl<OutgoingMessage, Sender, Fil> overseer::SubsystemSender<OutgoingMessage> for InterceptedSender<Sender, Fil>
 where
-	OutgoingMessage: overseer::AssociateOutgoing + Send + 'static,
+	OutgoingMessage: overseer::AssociateOutgoing + Send + 'static + TryFrom<overseer::AllMessages>,
 	Sender: overseer::SubsystemSender<OutgoingMessage>
 		+ overseer::SubsystemSender<
 				<
@@ -78,17 +86,48 @@ where
 	<
 		<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
 	>::OutgoingMessages:
-		From<OutgoingMessage>,
+		From<OutgoingMessage> + Send + Sync,
+	<OutgoingMessage as TryFrom<overseer::AllMessages>>::Error: std::fmt::Debug,
 {
 	async fn send_message(&mut self, msg: OutgoingMessage) {
 		let msg = <
 					<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
 				>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
-		if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
+		if self.message_filter.need_intercept_outgoing(&msg) {
+			if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
+				self.inner.send_message(msg).await;
+			}
+		}
+		else {
 			self.inner.send_message(msg).await;
 		}
 	}
 
+	fn try_send_message(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
+		let msg = <
+				<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
+			>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
+		if self.message_filter.need_intercept_outgoing(&msg) {
+			if let Some(real_msg) = self.message_filter.intercept_outgoing(&msg) {
+				let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
+				self.inner.try_send_message(real_msg).map_err(|e| {
+					match e {
+						TrySendError::Full(_) => TrySendError::Full(orig_msg),
+						TrySendError::Closed(_) => TrySendError::Closed(orig_msg),
+					}
+				})
+			}
+			else {
+				// No message to send after intercepting
+				Ok(())
+			}
+		}
+		else {
+			let orig_msg : OutgoingMessage = msg.into().try_into().expect("must be able to recover the original message");
+			self.inner.try_send_message(orig_msg)
+		}
+	}
+
 	async fn send_messages<T>(&mut self, msgs: T)
 	where
 		T: IntoIterator<Item = OutgoingMessage> + Send,
@@ -101,9 +140,14 @@ where
 
 	fn send_unbounded_message(&mut self, msg: OutgoingMessage) {
 		let msg = <
-					<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
-				>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
-		if let Some(msg) = self.message_filter.intercept_outgoing(msg) {
+				<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
+			>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
+		if self.message_filter.need_intercept_outgoing(&msg) {
+			if let Some(msg) = self.message_filter.intercept_outgoing(&msg) {
+				self.inner.send_unbounded_message(msg);
+			}
+		}
+		else {
 			self.inner.send_unbounded_message(msg);
 		}
 	}
diff --git a/polkadot/node/malus/src/variants/common.rs b/polkadot/node/malus/src/variants/common.rs
index bc5f6f92aed..365b2f16ac2 100644
--- a/polkadot/node/malus/src/variants/common.rs
+++ b/polkadot/node/malus/src/variants/common.rs
@@ -498,11 +498,4 @@ where
 			msg => Some(msg),
 		}
 	}
-
-	fn intercept_outgoing(
-		&self,
-		msg: overseer::CandidateValidationOutgoingMessages,
-	) -> Option<overseer::CandidateValidationOutgoingMessages> {
-		Some(msg)
-	}
 }
diff --git a/polkadot/node/metrics/Cargo.toml b/polkadot/node/metrics/Cargo.toml
index d497fa7607a..e13ae63199f 100644
--- a/polkadot/node/metrics/Cargo.toml
+++ b/polkadot/node/metrics/Cargo.toml
@@ -11,8 +11,7 @@ futures = "0.3.21"
 futures-timer = "3.0.2"
 gum = { package = "tracing-gum", path = "../gum" }
 
-metered = { package = "prioritized-metered-channel", version = "0.2.0" }
-
+metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }
 # Both `sc-service` and `sc-cli` are required by runtime metrics `logger_hook()`.
 sc-service = { path = "../../../substrate/client/service" }
 sc-cli = { path = "../../../substrate/client/cli" }
diff --git a/polkadot/node/network/bridge/src/metrics.rs b/polkadot/node/network/bridge/src/metrics.rs
index bb90daad567..083a2a71aa0 100644
--- a/polkadot/node/network/bridge/src/metrics.rs
+++ b/polkadot/node/network/bridge/src/metrics.rs
@@ -105,9 +105,27 @@ impl Metrics {
 
 	pub fn on_report_event(&self) {
 		if let Some(metrics) = self.0.as_ref() {
+			self.on_message("report_peer");
 			metrics.report_events.inc()
 		}
 	}
+
+	pub fn on_message(&self, message_type: &'static str) {
+		if let Some(metrics) = self.0.as_ref() {
+			metrics.messages_sent.with_label_values(&[message_type]).inc()
+		}
+	}
+
+	pub fn on_delayed_rx_queue(&self, queue_size: usize) {
+		if let Some(metrics) = self.0.as_ref() {
+			metrics.rx_delayed_processing.observe(queue_size as f64);
+		}
+	}
+	pub fn time_delayed_rx_events(
+		&self,
+	) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
+		self.0.as_ref().map(|metrics| metrics.rx_delayed_processing_time.start_timer())
+	}
 }
 
 #[derive(Clone)]
@@ -123,6 +141,13 @@ pub(crate) struct MetricsInner {
 
 	bytes_received: prometheus::CounterVec<prometheus::U64>,
 	bytes_sent: prometheus::CounterVec<prometheus::U64>,
+
+	messages_sent: prometheus::CounterVec<prometheus::U64>,
+	// The reason why a `Histogram` is used to track a queue size is that
+	// we need not only an average size of the queue (that will be 0 normally), but
+	// we also need a dynamics for this queue size in case of messages delays.
+	rx_delayed_processing: prometheus::Histogram,
+	rx_delayed_processing_time: prometheus::Histogram,
 }
 
 impl metrics::Metrics for Metrics {
@@ -217,6 +242,34 @@ impl metrics::Metrics for Metrics {
 				)?,
 				registry,
 			)?,
+			messages_sent: prometheus::register(
+				prometheus::CounterVec::new(
+					prometheus::Opts::new(
+						"polkadot_parachain_messages_sent_total",
+						"The number of messages sent via network bridge",
+					),
+					&["type"]
+				)?,
+				registry,
+			)?,
+			rx_delayed_processing: prometheus::register(
+				prometheus::Histogram::with_opts(
+					prometheus::HistogramOpts::new(
+						"polkadot_parachain_network_bridge_rx_delayed",
+						"Number of events being delayed while broadcasting from the network bridge",
+					).buckets(vec![0.0, 1.0, 2.0, 8.0, 16.0]),
+				)?,
+				registry,
+			)?,
+			rx_delayed_processing_time: prometheus::register(
+				prometheus::Histogram::with_opts(
+					prometheus::HistogramOpts::new(
+						"polkadot_parachain_network_bridge_rx_delayed_time",
+						"Time spent for waiting of the delayed events",
+					),
+				)?,
+				registry,
+			)?,
 		};
 
 		Ok(Metrics(Some(metrics)))
diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs
index 4f21212dcb6..823e1254612 100644
--- a/polkadot/node/network/bridge/src/network.rs
+++ b/polkadot/node/network/bridge/src/network.rs
@@ -61,6 +61,7 @@ pub(crate) fn send_message<M>(
 	let message = {
 		let encoded = message.encode();
 		metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
+		metrics.on_message(std::any::type_name::<M>());
 		encoded
 	};
 
diff --git a/polkadot/node/network/bridge/src/rx/mod.rs b/polkadot/node/network/bridge/src/rx/mod.rs
index 51d248ca2d4..e1125ebc904 100644
--- a/polkadot/node/network/bridge/src/rx/mod.rs
+++ b/polkadot/node/network/bridge/src/rx/mod.rs
@@ -20,7 +20,10 @@ use super::*;
 
 use always_assert::never;
 use bytes::Bytes;
-use futures::stream::BoxStream;
+use futures::{
+	future::BoxFuture,
+	stream::{BoxStream, FuturesUnordered, StreamExt},
+};
 use parity_scale_codec::{Decode, DecodeAll};
 
 use sc_network::Event as NetworkEvent;
@@ -244,6 +247,7 @@ where
 								NetworkBridgeEvent::PeerViewChange(peer, View::default()),
 							],
 							&mut sender,
+							&metrics,
 						)
 						.await;
 
@@ -352,6 +356,7 @@ where
 							dispatch_validation_event_to_all(
 								NetworkBridgeEvent::PeerDisconnected(peer),
 								&mut sender,
+								&metrics,
 							)
 							.await,
 						PeerSet::Collation =>
@@ -490,7 +495,7 @@ where
 						network_service.report_peer(remote, report.into());
 					}
 
-					dispatch_validation_events_to_all(events, &mut sender).await;
+					dispatch_validation_events_to_all(events, &mut sender, &metrics).await;
 				}
 
 				if !c_messages.is_empty() {
@@ -992,8 +997,9 @@ fn send_collation_message_vstaging(
 async fn dispatch_validation_event_to_all(
 	event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
 	ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
+	metrics: &Metrics,
 ) {
-	dispatch_validation_events_to_all(std::iter::once(event), ctx).await
+	dispatch_validation_events_to_all(std::iter::once(event), ctx, metrics).await
 }
 
 async fn dispatch_collation_event_to_all(
@@ -1038,20 +1044,65 @@ fn dispatch_collation_event_to_all_unbounded(
 	}
 }
 
+fn send_or_queue_validation_event<E, Sender>(
+	event: E,
+	sender: &mut Sender,
+	delayed_queue: &FuturesUnordered<BoxFuture<'static, ()>>,
+) where
+	E: Send + 'static,
+	Sender: overseer::NetworkBridgeRxSenderTrait + overseer::SubsystemSender<E>,
+{
+	match sender.try_send_message(event) {
+		Ok(()) => {},
+		Err(overseer::TrySendError::Full(event)) => {
+			let mut sender = sender.clone();
+			delayed_queue.push(Box::pin(async move {
+				sender.send_message(event).await;
+			}));
+		},
+		Err(overseer::TrySendError::Closed(_)) => {
+			panic!(
+				"NetworkBridgeRxSender is closed when trying to send event of type: {}",
+				std::any::type_name::<E>()
+			);
+		},
+	}
+}
+
 async fn dispatch_validation_events_to_all<I>(
 	events: I,
 	sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
+	metrics: &Metrics,
 ) where
 	I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
 	I::IntoIter: Send,
 {
+	let delayed_messages: FuturesUnordered<BoxFuture<'static, ()>> = FuturesUnordered::new();
+
+	// Fast path for sending events to subsystems, if any subsystem's queue is full, we hold
+	// the slow path future in the `delayed_messages` queue.
 	for event in events {
-		sender
-			.send_messages(event.focus().map(StatementDistributionMessage::from))
-			.await;
-		sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
-		sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
-		sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
+		if let Ok(msg) = event.focus().map(StatementDistributionMessage::from) {
+			send_or_queue_validation_event(msg, sender, &delayed_messages);
+		}
+		if let Ok(msg) = event.focus().map(BitfieldDistributionMessage::from) {
+			send_or_queue_validation_event(msg, sender, &delayed_messages);
+		}
+		if let Ok(msg) = event.focus().map(ApprovalDistributionMessage::from) {
+			send_or_queue_validation_event(msg, sender, &delayed_messages);
+		}
+		if let Ok(msg) = event.focus().map(GossipSupportMessage::from) {
+			send_or_queue_validation_event(msg, sender, &delayed_messages);
+		}
+	}
+
+	let delayed_messages_count = delayed_messages.len();
+	metrics.on_delayed_rx_queue(delayed_messages_count);
+
+	if delayed_messages_count > 0 {
+		// Here we wait for all the delayed messages to be sent.
+		let _timer = metrics.time_delayed_rx_events(); // Dropped after `await` is completed
+		let _: Vec<()> = delayed_messages.collect().await;
 	}
 }
 
diff --git a/polkadot/node/network/bridge/src/tx/mod.rs b/polkadot/node/network/bridge/src/tx/mod.rs
index 1b386ce1239..7fa1149593c 100644
--- a/polkadot/node/network/bridge/src/tx/mod.rs
+++ b/polkadot/node/network/bridge/src/tx/mod.rs
@@ -33,6 +33,7 @@ use polkadot_node_subsystem::{
 ///
 /// To be passed to [`FullNetworkConfiguration::add_notification_protocol`]().
 pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
+use polkadot_node_network_protocol::request_response::Requests;
 use sc_network::ReputationChange;
 
 use crate::validator_discovery;
@@ -290,6 +291,20 @@ where
 			);
 
 			for req in reqs {
+				match req {
+					Requests::ChunkFetchingV1(_) => metrics.on_message("chunk_fetching_v1"),
+					Requests::AvailableDataFetchingV1(_) =>
+						metrics.on_message("available_data_fetching_v1"),
+					Requests::CollationFetchingV1(_) => metrics.on_message("collation_fetching_v1"),
+					Requests::CollationFetchingVStaging(_) =>
+						metrics.on_message("collation_fetching_vstaging"),
+					Requests::PoVFetchingV1(_) => metrics.on_message("pov_fetching_v1"),
+					Requests::DisputeSendingV1(_) => metrics.on_message("dispute_sending_v1"),
+					Requests::StatementFetchingV1(_) => metrics.on_message("statement_fetching_v1"),
+					Requests::AttestedCandidateVStaging(_) =>
+						metrics.on_message("attested_candidate_vstaging"),
+				}
+
 				network_service
 					.start_request(
 						&mut authority_discovery_service,
diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml
index 0efd4d4c6ca..5d41407ef83 100644
--- a/polkadot/node/overseer/Cargo.toml
+++ b/polkadot/node/overseer/Cargo.toml
@@ -16,7 +16,7 @@ polkadot-node-primitives = { path = "../primitives" }
 polkadot-node-subsystem-types = { path = "../subsystem-types" }
 polkadot-node-metrics = { path = "../metrics" }
 polkadot-primitives = { path = "../../primitives" }
-orchestra = "0.0.5"
+orchestra = { version = "0.3.3", default-features = false, features=["futures_channel"] }
 gum = { package = "tracing-gum", path = "../gum" }
 schnellru = "0.2.1"
 sp-core = { path = "../../../substrate/primitives/core" }
@@ -24,7 +24,7 @@ async-trait = "0.1.57"
 tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
 
 [dev-dependencies]
-metered = { package = "prioritized-metered-channel", version = "0.2.0" }
+metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }
 sp-core = { path = "../../../substrate/primitives/core" }
 futures = { version = "0.3.21", features = ["thread-pool"] }
 femme = "2.2.1"
@@ -36,7 +36,8 @@ node-test-helpers = { package = "polkadot-node-subsystem-test-helpers", path = "
 tikv-jemalloc-ctl = "0.5.0"
 
 [features]
-default = []
-expand = [ "orchestra/expand" ]
+default = [ "futures_channel" ]
 dotgraph = [ "orchestra/dotgraph" ]
+expand = [ "orchestra/expand" ]
+futures_channel = [ "metered/futures_channel", "orchestra/futures_channel" ]
 jemalloc-allocator = [ "dep:tikv-jemalloc-ctl" ]
diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs
index 7337f1e6be7..84d5d19c3b9 100644
--- a/polkadot/node/overseer/src/lib.rs
+++ b/polkadot/node/overseer/src/lib.rs
@@ -107,7 +107,7 @@ pub use orchestra::{
 	contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket,
 	OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext,
 	SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters,
-	SubsystemSender, TimeoutExt, ToOrchestra,
+	SubsystemSender, TimeoutExt, ToOrchestra, TrySendError,
 };
 
 /// Store 2 days worth of blocks, not accounting for forks,
diff --git a/polkadot/node/overseer/src/tests.rs b/polkadot/node/overseer/src/tests.rs
index 298783f4180..c17613fb7ea 100644
--- a/polkadot/node/overseer/src/tests.rs
+++ b/polkadot/node/overseer/src/tests.rs
@@ -1074,6 +1074,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
 
 #[test]
 fn context_holds_onto_message_until_enough_signals_received() {
+	const CHANNEL_CAPACITY: usize = 64;
 	let (candidate_validation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
 	let (candidate_backing_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
 	let (statement_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY);
diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs
index fe6b106bf46..3f92513498c 100644
--- a/polkadot/node/subsystem-test-helpers/src/lib.rs
+++ b/polkadot/node/subsystem-test-helpers/src/lib.rs
@@ -20,7 +20,7 @@
 
 use polkadot_node_subsystem::{
 	messages::AllMessages, overseer, FromOrchestra, OverseerSignal, SpawnGlue, SpawnedSubsystem,
-	SubsystemError, SubsystemResult,
+	SubsystemError, SubsystemResult, TrySendError,
 };
 use polkadot_node_subsystem_util::TimeoutExt;
 
@@ -160,6 +160,14 @@ where
 		self.tx.send(msg.into()).await.expect("test overseer no longer live");
 	}
 
+	fn try_send_message(
+		&mut self,
+		msg: OutgoingMessage,
+	) -> Result<(), TrySendError<OutgoingMessage>> {
+		self.tx.unbounded_send(msg.into()).expect("test overseer no longer live");
+		Ok(())
+	}
+
 	async fn send_messages<I>(&mut self, msgs: I)
 	where
 		I: IntoIterator<Item = OutgoingMessage> + Send,
diff --git a/polkadot/node/subsystem-types/Cargo.toml b/polkadot/node/subsystem-types/Cargo.toml
index f6965cf647c..a1c00cb0652 100644
--- a/polkadot/node/subsystem-types/Cargo.toml
+++ b/polkadot/node/subsystem-types/Cargo.toml
@@ -14,7 +14,7 @@ polkadot-node-primitives = { path = "../primitives" }
 polkadot-node-network-protocol = { path = "../network/protocol" }
 polkadot-statement-table = { path = "../../statement-table" }
 polkadot-node-jaeger = { path = "../jaeger" }
-orchestra = "0.0.5"
+orchestra = { version = "0.3.3", default-features = false, features=["futures_channel"] }
 sc-network = { path = "../../../substrate/client/network" }
 sp-api = { path = "../../../substrate/primitives/api" }
 sp-consensus-babe = { path = "../../../substrate/primitives/consensus/babe" }
diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml
index 0d5ae7a0e8e..d9364e2c2c0 100644
--- a/polkadot/node/subsystem-util/Cargo.toml
+++ b/polkadot/node/subsystem-util/Cargo.toml
@@ -29,7 +29,7 @@ polkadot-node-network-protocol = { path = "../network/protocol" }
 polkadot-primitives = { path = "../../primitives" }
 polkadot-node-primitives = { path = "../primitives" }
 polkadot-overseer = { path = "../overseer" }
-metered = { package = "prioritized-metered-channel", version = "0.2.0" }
+metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features=["futures_channel"] }
 
 sp-core = { path = "../../../substrate/primitives/core" }
 sp-application-crypto = { path = "../../../substrate/primitives/application-crypto" }
-- 
GitLab