From 889364ed3d3dd58b6d6dac89d05404f9f87a0825 Mon Sep 17 00:00:00 2001
From: "paritytech-cmd-bot-polkadot-sdk[bot]"
 <179002856+paritytech-cmd-bot-polkadot-sdk[bot]@users.noreply.github.com>
Date: Thu, 23 Jan 2025 09:52:24 +0100
Subject: [PATCH] stable2412/backport #7222: Enforce libp2p outbound
 request-response timeout limits (#7302)

Backport #7222 into `stable2412` from lexnv.

See the
[documentation](https://github.com/paritytech/polkadot-sdk/blob/master/docs/BACKPORT.md)
on how to use this bot.

<!--
  # To be used by other automation, do not modify:
  original-pr-number: #${pull_number}
-->

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Co-authored-by: Alexandru Vasile <alexandru.vasile@parity.io>
---
 prdoc/pr_7222.prdoc                           |   19 +
 .../client/network/src/request_responses.rs   | 1075 ++++++++++-------
 2 files changed, 648 insertions(+), 446 deletions(-)
 create mode 100644 prdoc/pr_7222.prdoc

diff --git a/prdoc/pr_7222.prdoc b/prdoc/pr_7222.prdoc
new file mode 100644
index 00000000000..40b89b0a182
--- /dev/null
+++ b/prdoc/pr_7222.prdoc
@@ -0,0 +1,19 @@
+title: Enforce libp2p outbound request-response timeout limits
+
+doc:
+  - audience: Node Dev
+    description: |
+      This PR enforces that outbound requests are finished within the specified protocol timeout.
+      The stable2412 version running libp2p 0.52.4 contains a bug which does not track request timeouts properly
+      https://github.com/libp2p/rust-libp2p/pull/5429.
+  
+      The issue has been detected while submitting libp2p to litep2p requests in Kusama.
+      This aims to check that pending outbound requests have not timed out.
+      Although the issue has been fixed in libp2p, there might be other cases where this may happen.
+      For example, https://github.com/libp2p/rust-libp2p/pull/5417.
+
+      For more context see https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096.
+
+crates:
+- name: sc-network
+  bump: patch
diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs
index 6c2631924df..3a390f70a4d 100644
--- a/substrate/client/network/src/request_responses.rs
+++ b/substrate/client/network/src/request_responses.rs
@@ -66,6 +66,9 @@ use std::{
 
 pub use libp2p::request_response::{Config, RequestId};
 
+/// Periodically check if requests are taking too long.
+const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
+
 /// Possible failures occurring in the context of sending an outbound request and receiving the
 /// response.
 #[derive(Debug, thiserror::Error)]
@@ -245,8 +248,14 @@ pub struct OutgoingResponse {
 
 /// Information stored about a pending request.
 struct PendingRequest {
+	/// The time when the request was sent to the libp2p request-response protocol.
 	started_at: Instant,
-	response_tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
+	/// The channel to send the response back to the caller.
+	///
+	/// This is wrapped in an `Option` to allow for the channel to be taken out
+	/// on force-detected timeouts.
+	response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
+	/// Fallback request to send if the primary request fails.
 	fallback_request: Option<(Vec<u8>, ProtocolName)>,
 }
 
@@ -330,16 +339,20 @@ impl From<(ProtocolName, RequestId)> for ProtocolRequestId {
 	}
 }
 
+/// Details of a request-response protocol.
+struct ProtocolDetails {
+	behaviour: Behaviour<GenericCodec>,
+	inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
+	request_timeout: Duration,
+}
+
 /// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
 pub struct RequestResponsesBehaviour {
 	/// The multiple sub-protocols, by name.
 	///
 	/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
 	/// "response builder" used to build responses for incoming requests.
-	protocols: HashMap<
-		ProtocolName,
-		(Behaviour<GenericCodec>, Option<async_channel::Sender<IncomingRequest>>),
-	>,
+	protocols: HashMap<ProtocolName, ProtocolDetails>,
 
 	/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
 	pending_requests: HashMap<ProtocolRequestId, PendingRequest>,
@@ -359,6 +372,14 @@ pub struct RequestResponsesBehaviour {
 
 	/// Primarily used to get a reputation of a node.
 	peer_store: Arc<dyn PeerStoreProvider>,
+
+	/// Interval to check that the requests are not taking too long.
+	///
+	/// We had issues in the past where libp2p did not produce a timeout event in due time.
+	///
+	/// For more details, see:
+	/// - <https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096>
+	periodic_request_check: tokio::time::Interval,
 }
 
 /// Generated by the response builder and waiting to be processed.
@@ -388,7 +409,7 @@ impl RequestResponsesBehaviour {
 				ProtocolSupport::Outbound
 			};
 
-			let rq_rp = Behaviour::with_codec(
+			let behaviour = Behaviour::with_codec(
 				GenericCodec {
 					max_request_size: protocol.max_request_size,
 					max_response_size: protocol.max_response_size,
@@ -400,7 +421,11 @@ impl RequestResponsesBehaviour {
 			);
 
 			match protocols.entry(protocol.name) {
-				Entry::Vacant(e) => e.insert((rq_rp, protocol.inbound_queue)),
+				Entry::Vacant(e) => e.insert(ProtocolDetails {
+					behaviour,
+					inbound_queue: protocol.inbound_queue,
+					request_timeout: protocol.request_timeout,
+				}),
 				Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())),
 			};
 		}
@@ -412,6 +437,7 @@ impl RequestResponsesBehaviour {
 			pending_responses_arrival_time: Default::default(),
 			send_feedback: Default::default(),
 			peer_store,
+			periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
 		})
 	}
 
@@ -432,9 +458,11 @@ impl RequestResponsesBehaviour {
 	) {
 		log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len());
 
-		if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) {
+		if let Some(ProtocolDetails { behaviour, .. }) =
+			self.protocols.get_mut(protocol_name.deref())
+		{
 			Self::send_request_inner(
-				protocol,
+				behaviour,
 				&mut self.pending_requests,
 				target,
 				protocol_name,
@@ -469,7 +497,7 @@ impl RequestResponsesBehaviour {
 				(protocol_name.to_string().into(), request_id).into(),
 				PendingRequest {
 					started_at: Instant::now(),
-					response_tx: pending_response,
+					response_tx: Some(pending_response),
 					fallback_request,
 				},
 			);
@@ -516,18 +544,19 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 		local_addr: &Multiaddr,
 		remote_addr: &Multiaddr,
 	) -> Result<THandler<Self>, ConnectionDenied> {
-		let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| {
-			if let Ok(handler) = r.handle_established_inbound_connection(
-				connection_id,
-				peer,
-				local_addr,
-				remote_addr,
-			) {
-				Some((p.to_string(), handler))
-			} else {
-				None
-			}
-		});
+		let iter =
+			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
+				if let Ok(handler) = behaviour.handle_established_inbound_connection(
+					connection_id,
+					peer,
+					local_addr,
+					remote_addr,
+				) {
+					Some((p.to_string(), handler))
+				} else {
+					None
+				}
+			});
 
 		Ok(MultiHandler::try_from_iter(iter).expect(
 			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
@@ -542,15 +571,19 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 		addr: &Multiaddr,
 		role_override: Endpoint,
 	) -> Result<THandler<Self>, ConnectionDenied> {
-		let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| {
-			if let Ok(handler) =
-				r.handle_established_outbound_connection(connection_id, peer, addr, role_override)
-			{
-				Some((p.to_string(), handler))
-			} else {
-				None
-			}
-		});
+		let iter =
+			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
+				if let Ok(handler) = behaviour.handle_established_outbound_connection(
+					connection_id,
+					peer,
+					addr,
+					role_override,
+				) {
+					Some((p.to_string(), handler))
+				} else {
+					None
+				}
+			});
 
 		Ok(MultiHandler::try_from_iter(iter).expect(
 			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
@@ -561,8 +594,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 	fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
 		match event {
 			FromSwarm::ConnectionEstablished(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::ConnectionEstablished(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(
+						behaviour,
+						FromSwarm::ConnectionEstablished(e),
+					);
 				},
 			FromSwarm::ConnectionClosed(ConnectionClosed {
 				peer_id,
@@ -572,8 +608,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 				remaining_established,
 			}) =>
 				for (p_name, p_handler) in handler.into_iter() {
-					if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
-						proto.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
+					if let Some(ProtocolDetails { behaviour, .. }) =
+						self.protocols.get_mut(p_name.as_str())
+					{
+						behaviour.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
 							peer_id,
 							connection_id,
 							endpoint,
@@ -589,48 +627,54 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 					}
 				},
 			FromSwarm::DialFailure(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::DialFailure(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::DialFailure(e));
 				},
 			FromSwarm::ListenerClosed(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerClosed(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenerClosed(e));
 				},
 			FromSwarm::ListenFailure(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenFailure(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenFailure(e));
 				},
 			FromSwarm::ListenerError(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerError(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenerError(e));
 				},
 			FromSwarm::ExternalAddrExpired(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrExpired(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ExternalAddrExpired(e));
 				},
 			FromSwarm::NewListener(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListener(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::NewListener(e));
 				},
 			FromSwarm::ExpiredListenAddr(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::ExpiredListenAddr(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ExpiredListenAddr(e));
 				},
 			FromSwarm::NewExternalAddrCandidate(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::NewExternalAddrCandidate(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(
+						behaviour,
+						FromSwarm::NewExternalAddrCandidate(e),
+					);
 				},
 			FromSwarm::ExternalAddrConfirmed(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrConfirmed(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(
+						behaviour,
+						FromSwarm::ExternalAddrConfirmed(e),
+					);
 				},
 			FromSwarm::AddressChange(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::AddressChange(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::AddressChange(e));
 				},
 			FromSwarm::NewListenAddr(e) =>
-				for (p, _) in self.protocols.values_mut() {
-					NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListenAddr(e));
+				for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
+					NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::NewListenAddr(e));
 				},
 		}
 	}
@@ -642,8 +686,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 		event: THandlerOutEvent<Self>,
 	) {
 		let p_name = event.0;
-		if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
-			return proto.on_connection_handler_event(peer_id, connection_id, event.1)
+		if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
+			return behaviour.on_connection_handler_event(peer_id, connection_id, event.1)
 		} else {
 			log::warn!(
 				target: "sub-libp2p",
@@ -659,6 +703,51 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 		params: &mut impl PollParameters,
 	) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
 		'poll_all: loop {
+			// Poll the periodic request check.
+			if self.periodic_request_check.poll_tick(cx).is_ready() {
+				self.pending_requests.retain(|id, req| {
+					let Some(ProtocolDetails { request_timeout, .. }) =
+						self.protocols.get(&id.protocol)
+					else {
+						log::warn!(
+							target: "sub-libp2p",
+							"Request {id:?} has no protocol registered.",
+						);
+
+						if let Some(response_tx) = req.response_tx.take() {
+							if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
+								log::debug!(
+									target: "sub-libp2p",
+									"Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
+								);
+							}
+						}
+						return false
+					};
+
+					let elapsed = req.started_at.elapsed();
+					if elapsed > *request_timeout {
+						log::debug!(
+							target: "sub-libp2p",
+							"Request {id:?} force detected as timeout.",
+						);
+
+						if let Some(response_tx) = req.response_tx.take() {
+							if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
+								log::debug!(
+									target: "sub-libp2p",
+									"Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
+								);
+							}
+						}
+
+						false
+					} else {
+						true
+					}
+				});
+			}
+
 			// Poll to see if any response is ready to be sent back.
 			while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
 				let RequestProcessingOutcome {
@@ -675,10 +764,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 				};
 
 				if let Ok(payload) = result {
-					if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
+					if let Some(ProtocolDetails { behaviour, .. }) =
+						self.protocols.get_mut(&*protocol_name)
+					{
 						log::trace!(target: "sub-libp2p", "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
 
-						if protocol.send_response(inner_channel, Ok(payload)).is_err() {
+						if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
 							// Note: Failure is handled further below when receiving
 							// `InboundFailure` event from request-response [`Behaviour`].
 							log::debug!(
@@ -706,7 +797,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 			let mut fallback_requests = vec![];
 
 			// Poll request-responses protocols.
-			for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols {
+			for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
+			{
 				'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) {
 					let ev = match ev {
 						// Main events we are interested in.
@@ -767,7 +859,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 
 							// Submit the request to the "response builder" passed by the user at
 							// initialization.
-							if let Some(resp_builder) = resp_builder {
+							if let Some(resp_builder) = inbound_queue {
 								// If the response builder is too busy, silently drop `tx`. This
 								// will be reported by the corresponding request-response
 								// [`Behaviour`] through an `InboundFailure::Omission` event.
@@ -815,7 +907,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 								.pending_requests
 								.remove(&(protocol.clone(), request_id).into())
 							{
-								Some(PendingRequest { started_at, response_tx, .. }) => {
+								Some(PendingRequest {
+									started_at,
+									response_tx: Some(response_tx),
+									..
+								}) => {
 									log::trace!(
 										target: "sub-libp2p",
 										"received response from {peer} ({protocol:?}), {} bytes",
@@ -831,13 +927,13 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 										.map_err(|_| RequestFailure::Obsolete);
 									(started_at, delivered)
 								},
-								None => {
-									log::warn!(
+								_ => {
+									log::debug!(
 										target: "sub-libp2p",
-										"Received `RequestResponseEvent::Message` with unexpected request id {:?}",
+										"Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
 										request_id,
+										peer,
 									);
-									debug_assert!(false);
 									continue
 								},
 							};
@@ -865,7 +961,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 							{
 								Some(PendingRequest {
 									started_at,
-									response_tx,
+									response_tx: Some(response_tx),
 									fallback_request,
 								}) => {
 									// Try using the fallback request if the protocol was not
@@ -905,13 +1001,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 									}
 									started_at
 								},
-								None => {
-									log::warn!(
+								_ => {
+									log::debug!(
 										target: "sub-libp2p",
-										"Received `RequestResponseEvent::Message` with unexpected request id {:?}",
+										"Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
 										request_id,
+										error,
+										peer
 									);
-									debug_assert!(false);
 									continue
 								},
 							};
@@ -976,7 +1073,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
 
 			// Send out fallback requests.
 			for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
-				if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) {
+				if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
 					Self::send_request_inner(
 						behaviour,
 						&mut self.pending_requests,
@@ -1145,7 +1242,7 @@ mod tests {
 
 	use crate::mock::MockPeerStore;
 	use assert_matches::assert_matches;
-	use futures::{channel::oneshot, executor::LocalPool, task::Spawn};
+	use futures::channel::oneshot;
 	use libp2p::{
 		core::{
 			transport::{MemoryTransport, Transport},
@@ -1158,10 +1255,10 @@ mod tests {
 	};
 	use std::{iter, time::Duration};
 
-	struct TokioExecutor(tokio::runtime::Runtime);
+	struct TokioExecutor;
 	impl Executor for TokioExecutor {
 		fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
-			let _ = self.0.spawn(f);
+			tokio::spawn(f);
 		}
 	}
 
@@ -1178,13 +1275,14 @@ mod tests {
 
 		let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
 
-		let runtime = tokio::runtime::Runtime::new().unwrap();
-
 		let mut swarm = Swarm::new(
 			transport,
 			behaviour,
 			keypair.public().to_peer_id(),
-			SwarmConfig::with_executor(TokioExecutor(runtime)),
+			SwarmConfig::with_executor(TokioExecutor {})
+				// This is taken care of by notification protocols in non-test environment
+				// It is very slow in test environment for some reason, hence larger timeout
+				.with_idle_connection_timeout(Duration::from_secs(10)),
 		);
 
 		let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
@@ -1194,34 +1292,27 @@ mod tests {
 		(swarm, listen_addr)
 	}
 
-	#[test]
-	fn basic_request_response_works() {
+	#[tokio::test]
+	async fn basic_request_response_works() {
 		let protocol_name = ProtocolName::from("/test/req-resp/1");
-		let mut pool = LocalPool::new();
 
 		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
 		let mut swarms = (0..2)
 			.map(|_| {
 				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
 
-				pool.spawner()
-					.spawn_obj(
-						async move {
-							while let Some(rq) = rx.next().await {
-								let (fb_tx, fb_rx) = oneshot::channel();
-								assert_eq!(rq.payload, b"this is a request");
-								let _ = rq.pending_response.send(super::OutgoingResponse {
-									result: Ok(b"this is a response".to_vec()),
-									reputation_changes: Vec::new(),
-									sent_feedback: Some(fb_tx),
-								});
-								fb_rx.await.unwrap();
-							}
-						}
-						.boxed()
-						.into(),
-					)
-					.unwrap();
+				tokio::spawn(async move {
+					while let Some(rq) = rx.next().await {
+						let (fb_tx, fb_rx) = oneshot::channel();
+						assert_eq!(rq.payload, b"this is a request");
+						let _ = rq.pending_response.send(super::OutgoingResponse {
+							result: Ok(b"this is a response".to_vec()),
+							reputation_changes: Vec::new(),
+							sent_feedback: Some(fb_tx),
+						});
+						fb_rx.await.unwrap();
+					}
+				});
 
 				let protocol_config = ProtocolConfig {
 					name: protocol_name.clone(),
@@ -1245,84 +1336,69 @@ mod tests {
 
 		let (mut swarm, _) = swarms.remove(0);
 		// Running `swarm[0]` in the background.
-		pool.spawner()
-			.spawn_obj({
-				async move {
-					loop {
-						match swarm.select_next_some().await {
-							SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
-								result.unwrap();
-							},
-							_ => {},
-						}
-					}
-				}
-				.boxed()
-				.into()
-			})
-			.unwrap();
-
-		// Remove and run the remaining swarm.
-		let (mut swarm, _) = swarms.remove(0);
-		pool.run_until(async move {
-			let mut response_receiver = None;
-
+		tokio::spawn(async move {
 			loop {
 				match swarm.select_next_some().await {
-					SwarmEvent::ConnectionEstablished { peer_id, .. } => {
-						let (sender, receiver) = oneshot::channel();
-						swarm.behaviour_mut().send_request(
-							&peer_id,
-							protocol_name.clone(),
-							b"this is a request".to_vec(),
-							None,
-							sender,
-							IfDisconnected::ImmediateError,
-						);
-						assert!(response_receiver.is_none());
-						response_receiver = Some(receiver);
-					},
-					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
+					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
 						result.unwrap();
-						break
 					},
 					_ => {},
 				}
 			}
-
-			assert_eq!(
-				response_receiver.unwrap().await.unwrap().unwrap(),
-				(b"this is a response".to_vec(), protocol_name)
-			);
 		});
+
+		// Remove and run the remaining swarm.
+		let (mut swarm, _) = swarms.remove(0);
+		let mut response_receiver = None;
+
+		loop {
+			match swarm.select_next_some().await {
+				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
+					let (sender, receiver) = oneshot::channel();
+					swarm.behaviour_mut().send_request(
+						&peer_id,
+						protocol_name.clone(),
+						b"this is a request".to_vec(),
+						None,
+						sender,
+						IfDisconnected::ImmediateError,
+					);
+					assert!(response_receiver.is_none());
+					response_receiver = Some(receiver);
+				},
+				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
+					result.unwrap();
+					break
+				},
+				_ => {},
+			}
+		}
+
+		assert_eq!(
+			response_receiver.unwrap().await.unwrap().unwrap(),
+			(b"this is a response".to_vec(), protocol_name)
+		);
 	}
 
-	#[test]
-	fn max_response_size_exceeded() {
+	#[tokio::test]
+	async fn max_response_size_exceeded() {
 		let protocol_name = ProtocolName::from("/test/req-resp/1");
-		let mut pool = LocalPool::new();
 
 		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
 		let mut swarms = (0..2)
 			.map(|_| {
 				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
 
-				pool.spawner()
-					.spawn_obj(
-						async move {
-							while let Some(rq) = rx.next().await {
-								assert_eq!(rq.payload, b"this is a request");
-								let _ = rq.pending_response.send(super::OutgoingResponse {
-									result: Ok(b"this response exceeds the limit".to_vec()),
-									reputation_changes: Vec::new(),
-									sent_feedback: None,
-								});
-							}
-						}
-						.boxed()
-						.into(),
-					)
-					.unwrap();
+				tokio::spawn(async move {
+					while let Some(rq) = rx.next().await {
+						assert_eq!(rq.payload, b"this is a request");
+						let _ = rq.pending_response.send(super::OutgoingResponse {
+							result: Ok(b"this response exceeds the limit".to_vec()),
+							reputation_changes: Vec::new(),
+							sent_feedback: None,
+						});
+					}
+				});
 
 				let protocol_config = ProtocolConfig {
 					name: protocol_name.clone(),
@@ -1347,57 +1423,50 @@ mod tests {
 		// Running `swarm[0]` in the background until a `InboundRequest` event happens,
 		// which is a hint about the test having ended.
 		let (mut swarm, _) = swarms.remove(0);
-		pool.spawner()
-			.spawn_obj({
-				async move {
-					loop {
-						match swarm.select_next_some().await {
-							SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
-								assert!(result.is_ok());
-								break
-							},
-							_ => {},
-						}
-					}
-				}
-				.boxed()
-				.into()
-			})
-			.unwrap();
-
-		// Remove and run the remaining swarm.
-		let (mut swarm, _) = swarms.remove(0);
-		pool.run_until(async move {
-			let mut response_receiver = None;
-
+		tokio::spawn(async move {
 			loop {
 				match swarm.select_next_some().await {
-					SwarmEvent::ConnectionEstablished { peer_id, .. } => {
-						let (sender, receiver) = oneshot::channel();
-						swarm.behaviour_mut().send_request(
-							&peer_id,
-							protocol_name.clone(),
-							b"this is a request".to_vec(),
-							None,
-							sender,
-							IfDisconnected::ImmediateError,
-						);
-						assert!(response_receiver.is_none());
-						response_receiver = Some(receiver);
-					},
-					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
-						assert!(result.is_err());
-						break
+					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
+						assert!(result.is_ok());
+						break;
 					},
 					_ => {},
 				}
 			}
+		});
+
+		// Remove and run the remaining swarm.
+		let (mut swarm, _) = swarms.remove(0);
+
+		let mut response_receiver = None;
 
-			match response_receiver.unwrap().await.unwrap().unwrap_err() {
-				RequestFailure::Network(OutboundFailure::ConnectionClosed) => {},
-				_ => panic!(),
+		loop {
+			match swarm.select_next_some().await {
+				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
+					let (sender, receiver) = oneshot::channel();
+					swarm.behaviour_mut().send_request(
+						&peer_id,
+						protocol_name.clone(),
+						b"this is a request".to_vec(),
+						None,
+						sender,
+						IfDisconnected::ImmediateError,
+					);
+					assert!(response_receiver.is_none());
+					response_receiver = Some(receiver);
+				},
+				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
+					assert!(result.is_err());
+					break
+				},
+				_ => {},
 			}
-		});
+		}
+
+		match response_receiver.unwrap().await.unwrap().unwrap_err() {
+			RequestFailure::Network(OutboundFailure::ConnectionClosed) => {},
+			request_failure => panic!("Unexpected failure: {request_failure:?}"),
+		}
 	}
 
 	/// A [`RequestId`] is a unique identifier among either all inbound or all outbound requests for
@@ -1410,11 +1479,10 @@ mod tests {
 	/// without a [`RequestId`] collision.
 	///
 	/// See [`ProtocolRequestId`] for additional information.
-	#[test]
-	fn request_id_collision() {
+	#[tokio::test]
+	async fn request_id_collision() {
 		let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
 		let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
-		let mut pool = LocalPool::new();
 
 		let mut swarm_1 = {
 			let protocol_configs = vec![
@@ -1472,114 +1540,100 @@ mod tests {
 		swarm_1.dial(listen_add_2).unwrap();
 
 		// Run swarm 2 in the background, receiving two requests.
-		pool.spawner()
-			.spawn_obj(
-				async move {
-					loop {
-						match swarm_2.select_next_some().await {
-							SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
-								result.unwrap();
-							},
-							_ => {},
-						}
-					}
+		tokio::spawn(async move {
+			loop {
+				match swarm_2.select_next_some().await {
+					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
+						result.unwrap();
+					},
+					_ => {},
 				}
-				.boxed()
-				.into(),
-			)
-			.unwrap();
+			}
+		});
 
 		// Handle both requests sent by swarm 1 to swarm 2 in the background.
 		//
 		// Make sure both requests overlap, by answering the first only after receiving the
 		// second.
-		pool.spawner()
-			.spawn_obj(
-				async move {
-					let protocol_1_request = swarm_2_handler_1.next().await;
-					let protocol_2_request = swarm_2_handler_2.next().await;
-
-					protocol_1_request
-						.unwrap()
-						.pending_response
-						.send(OutgoingResponse {
-							result: Ok(b"this is a response".to_vec()),
-							reputation_changes: Vec::new(),
-							sent_feedback: None,
-						})
-						.unwrap();
-					protocol_2_request
-						.unwrap()
-						.pending_response
-						.send(OutgoingResponse {
-							result: Ok(b"this is a response".to_vec()),
-							reputation_changes: Vec::new(),
-							sent_feedback: None,
-						})
-						.unwrap();
-				}
-				.boxed()
-				.into(),
-			)
-			.unwrap();
+		tokio::spawn(async move {
+			let protocol_1_request = swarm_2_handler_1.next().await;
+			let protocol_2_request = swarm_2_handler_2.next().await;
+
+			protocol_1_request
+				.unwrap()
+				.pending_response
+				.send(OutgoingResponse {
+					result: Ok(b"this is a response".to_vec()),
+					reputation_changes: Vec::new(),
+					sent_feedback: None,
+				})
+				.unwrap();
+			protocol_2_request
+				.unwrap()
+				.pending_response
+				.send(OutgoingResponse {
+					result: Ok(b"this is a response".to_vec()),
+					reputation_changes: Vec::new(),
+					sent_feedback: None,
+				})
+				.unwrap();
+		});
 
 		// Have swarm 1 send two requests to swarm 2 and await responses.
-		pool.run_until(async move {
-			let mut response_receivers = None;
-			let mut num_responses = 0;
 
-			loop {
-				match swarm_1.select_next_some().await {
-					SwarmEvent::ConnectionEstablished { peer_id, .. } => {
-						let (sender_1, receiver_1) = oneshot::channel();
-						let (sender_2, receiver_2) = oneshot::channel();
-						swarm_1.behaviour_mut().send_request(
-							&peer_id,
-							protocol_name_1.clone(),
-							b"this is a request".to_vec(),
-							None,
-							sender_1,
-							IfDisconnected::ImmediateError,
-						);
-						swarm_1.behaviour_mut().send_request(
-							&peer_id,
-							protocol_name_2.clone(),
-							b"this is a request".to_vec(),
-							None,
-							sender_2,
-							IfDisconnected::ImmediateError,
-						);
-						assert!(response_receivers.is_none());
-						response_receivers = Some((receiver_1, receiver_2));
-					},
-					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
-						num_responses += 1;
-						result.unwrap();
-						if num_responses == 2 {
-							break
-						}
-					},
-					_ => {},
-				}
+		let mut response_receivers = None;
+		let mut num_responses = 0;
+
+		loop {
+			match swarm_1.select_next_some().await {
+				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
+					let (sender_1, receiver_1) = oneshot::channel();
+					let (sender_2, receiver_2) = oneshot::channel();
+					swarm_1.behaviour_mut().send_request(
+						&peer_id,
+						protocol_name_1.clone(),
+						b"this is a request".to_vec(),
+						None,
+						sender_1,
+						IfDisconnected::ImmediateError,
+					);
+					swarm_1.behaviour_mut().send_request(
+						&peer_id,
+						protocol_name_2.clone(),
+						b"this is a request".to_vec(),
+						None,
+						sender_2,
+						IfDisconnected::ImmediateError,
+					);
+					assert!(response_receivers.is_none());
+					response_receivers = Some((receiver_1, receiver_2));
+				},
+				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
+					num_responses += 1;
+					result.unwrap();
+					if num_responses == 2 {
+						break
+					}
+				},
+				_ => {},
 			}
-			let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
-			assert_eq!(
-				response_receiver_1.await.unwrap().unwrap(),
-				(b"this is a response".to_vec(), protocol_name_1)
-			);
-			assert_eq!(
-				response_receiver_2.await.unwrap().unwrap(),
-				(b"this is a response".to_vec(), protocol_name_2)
-			);
-		});
+		}
+		let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
+		assert_eq!(
+			response_receiver_1.await.unwrap().unwrap(),
+			(b"this is a response".to_vec(), protocol_name_1)
+		);
+		assert_eq!(
+			response_receiver_2.await.unwrap().unwrap(),
+			(b"this is a response".to_vec(), protocol_name_2)
+		);
 	}
 
-	#[test]
-	fn request_fallback() {
+	#[tokio::test]
+	async fn request_fallback() {
 		let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
 		let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
 		let protocol_name_2 = ProtocolName::from("/test/another");
-		let mut pool = LocalPool::new();
 
 		let protocol_config_1 = ProtocolConfig {
 			name: protocol_name_1.clone(),
@@ -1617,39 +1671,31 @@ mod tests {
 			let mut protocol_config_2 = protocol_config_2.clone();
 			protocol_config_2.inbound_queue = Some(tx_2);
 
-			pool.spawner()
-				.spawn_obj(
-					async move {
-						for _ in 0..2 {
-							if let Some(rq) = rx_1.next().await {
-								let (fb_tx, fb_rx) = oneshot::channel();
-								assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
-								let _ = rq.pending_response.send(super::OutgoingResponse {
-									result: Ok(
-										b"this is a response on protocol /test/req-resp/1".to_vec()
-									),
-									reputation_changes: Vec::new(),
-									sent_feedback: Some(fb_tx),
-								});
-								fb_rx.await.unwrap();
-							}
-						}
-
-						if let Some(rq) = rx_2.next().await {
-							let (fb_tx, fb_rx) = oneshot::channel();
-							assert_eq!(rq.payload, b"request on protocol /test/other");
-							let _ = rq.pending_response.send(super::OutgoingResponse {
-								result: Ok(b"this is a response on protocol /test/other".to_vec()),
-								reputation_changes: Vec::new(),
-								sent_feedback: Some(fb_tx),
-							});
-							fb_rx.await.unwrap();
-						}
+			tokio::spawn(async move {
+				for _ in 0..2 {
+					if let Some(rq) = rx_1.next().await {
+						let (fb_tx, fb_rx) = oneshot::channel();
+						assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
+						let _ = rq.pending_response.send(super::OutgoingResponse {
+							result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
+							reputation_changes: Vec::new(),
+							sent_feedback: Some(fb_tx),
+						});
+						fb_rx.await.unwrap();
 					}
-					.boxed()
-					.into(),
-				)
-				.unwrap();
+				}
+
+				if let Some(rq) = rx_2.next().await {
+					let (fb_tx, fb_rx) = oneshot::channel();
+					assert_eq!(rq.payload, b"request on protocol /test/other");
+					let _ = rq.pending_response.send(super::OutgoingResponse {
+						result: Ok(b"this is a response on protocol /test/other".to_vec()),
+						reputation_changes: Vec::new(),
+						sent_feedback: Some(fb_tx),
+					});
+					fb_rx.await.unwrap();
+				}
+			});
 
 			build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
 		};
@@ -1670,132 +1716,269 @@ mod tests {
 		}
 
 		// Running `older_swarm`` in the background.
-		pool.spawner()
-			.spawn_obj({
-				async move {
-					loop {
-						_ = older_swarm.0.select_next_some().await;
-					}
-				}
-				.boxed()
-				.into()
-			})
-			.unwrap();
+		tokio::spawn(async move {
+			loop {
+				_ = older_swarm.0.select_next_some().await;
+			}
+		});
 
 		// Run the newer swarm. Attempt to make requests on all protocols.
 		let (mut swarm, _) = new_swarm;
 		let mut older_peer_id = None;
 
-		pool.run_until(async move {
-			let mut response_receiver = None;
-			// Try the new protocol with a fallback.
-			loop {
-				match swarm.select_next_some().await {
-					SwarmEvent::ConnectionEstablished { peer_id, .. } => {
-						older_peer_id = Some(peer_id);
-						let (sender, receiver) = oneshot::channel();
-						swarm.behaviour_mut().send_request(
-							&peer_id,
-							protocol_name_1.clone(),
-							b"request on protocol /test/req-resp/2".to_vec(),
-							Some((
-								b"request on protocol /test/req-resp/1".to_vec(),
-								protocol_config_1_fallback.name.clone(),
-							)),
-							sender,
-							IfDisconnected::ImmediateError,
-						);
-						response_receiver = Some(receiver);
-					},
-					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
-						result.unwrap();
-						break
-					},
-					_ => {},
-				}
+		let mut response_receiver = None;
+		// Try the new protocol with a fallback.
+		loop {
+			match swarm.select_next_some().await {
+				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
+					older_peer_id = Some(peer_id);
+					let (sender, receiver) = oneshot::channel();
+					swarm.behaviour_mut().send_request(
+						&peer_id,
+						protocol_name_1.clone(),
+						b"request on protocol /test/req-resp/2".to_vec(),
+						Some((
+							b"request on protocol /test/req-resp/1".to_vec(),
+							protocol_config_1_fallback.name.clone(),
+						)),
+						sender,
+						IfDisconnected::ImmediateError,
+					);
+					response_receiver = Some(receiver);
+				},
+				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
+					result.unwrap();
+					break
+				},
+				_ => {},
 			}
-			assert_eq!(
-				response_receiver.unwrap().await.unwrap().unwrap(),
-				(
-					b"this is a response on protocol /test/req-resp/1".to_vec(),
-					protocol_name_1_fallback.clone()
-				)
-			);
-			// Try the old protocol with a useless fallback.
-			let (sender, response_receiver) = oneshot::channel();
-			swarm.behaviour_mut().send_request(
-				older_peer_id.as_ref().unwrap(),
-				protocol_name_1_fallback.clone(),
-				b"request on protocol /test/req-resp/1".to_vec(),
-				Some((
-					b"dummy request, will fail if processed".to_vec(),
-					protocol_config_1_fallback.name.clone(),
-				)),
-				sender,
-				IfDisconnected::ImmediateError,
-			);
-			loop {
-				match swarm.select_next_some().await {
-					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
-						result.unwrap();
-						break
-					},
-					_ => {},
-				}
+		}
+		assert_eq!(
+			response_receiver.unwrap().await.unwrap().unwrap(),
+			(
+				b"this is a response on protocol /test/req-resp/1".to_vec(),
+				protocol_name_1_fallback.clone()
+			)
+		);
+		// Try the old protocol with a useless fallback.
+		let (sender, response_receiver) = oneshot::channel();
+		swarm.behaviour_mut().send_request(
+			older_peer_id.as_ref().unwrap(),
+			protocol_name_1_fallback.clone(),
+			b"request on protocol /test/req-resp/1".to_vec(),
+			Some((
+				b"dummy request, will fail if processed".to_vec(),
+				protocol_config_1_fallback.name.clone(),
+			)),
+			sender,
+			IfDisconnected::ImmediateError,
+		);
+		loop {
+			match swarm.select_next_some().await {
+				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
+					result.unwrap();
+					break
+				},
+				_ => {},
 			}
-			assert_eq!(
-				response_receiver.await.unwrap().unwrap(),
-				(
-					b"this is a response on protocol /test/req-resp/1".to_vec(),
-					protocol_name_1_fallback.clone()
-				)
-			);
-			// Try the new protocol with no fallback. Should fail.
-			let (sender, response_receiver) = oneshot::channel();
-			swarm.behaviour_mut().send_request(
-				older_peer_id.as_ref().unwrap(),
-				protocol_name_1.clone(),
-				b"request on protocol /test/req-resp-2".to_vec(),
-				None,
-				sender,
-				IfDisconnected::ImmediateError,
-			);
-			loop {
-				match swarm.select_next_some().await {
-					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
-						assert_matches!(
-							result.unwrap_err(),
-							RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
-						);
-						break
-					},
-					_ => {},
-				}
+		}
+		assert_eq!(
+			response_receiver.await.unwrap().unwrap(),
+			(
+				b"this is a response on protocol /test/req-resp/1".to_vec(),
+				protocol_name_1_fallback.clone()
+			)
+		);
+		// Try the new protocol with no fallback. Should fail.
+		let (sender, response_receiver) = oneshot::channel();
+		swarm.behaviour_mut().send_request(
+			older_peer_id.as_ref().unwrap(),
+			protocol_name_1.clone(),
+			b"request on protocol /test/req-resp-2".to_vec(),
+			None,
+			sender,
+			IfDisconnected::ImmediateError,
+		);
+		loop {
+			match swarm.select_next_some().await {
+				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
+					assert_matches!(
+						result.unwrap_err(),
+						RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
+					);
+					break
+				},
+				_ => {},
 			}
-			assert!(response_receiver.await.unwrap().is_err());
-			// Try the other protocol with no fallback.
-			let (sender, response_receiver) = oneshot::channel();
-			swarm.behaviour_mut().send_request(
-				older_peer_id.as_ref().unwrap(),
-				protocol_name_2.clone(),
-				b"request on protocol /test/other".to_vec(),
-				None,
-				sender,
-				IfDisconnected::ImmediateError,
-			);
+		}
+		assert!(response_receiver.await.unwrap().is_err());
+		// Try the other protocol with no fallback.
+		let (sender, response_receiver) = oneshot::channel();
+		swarm.behaviour_mut().send_request(
+			older_peer_id.as_ref().unwrap(),
+			protocol_name_2.clone(),
+			b"request on protocol /test/other".to_vec(),
+			None,
+			sender,
+			IfDisconnected::ImmediateError,
+		);
+		loop {
+			match swarm.select_next_some().await {
+				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
+					result.unwrap();
+					break
+				},
+				_ => {},
+			}
+		}
+		assert_eq!(
+			response_receiver.await.unwrap().unwrap(),
+			(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
+		);
+	}
+
+	/// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error
+	/// even if the libp2p component hangs.
+	///
+	/// For testing purposes, the communication happens on the `/test/req-resp/1` protocol.
+	///
+	/// This is achieved by:
+	/// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10
+	///   seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in
+	///   substrate this is set to 1 second.
+	///
+	/// - The first swarm introduces a delay of 2 seconds before responding to the request.
+	///
+	/// - The second swarm must enforce the 1 second timeout.
+	#[tokio::test]
+	async fn enforce_outbound_timeouts() {
+		const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
+		const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
+
+		// These swarms only speaks protocol_name.
+		let protocol_name = ProtocolName::from("/test/req-resp/1");
+
+		let protocol_config = ProtocolConfig {
+			name: protocol_name.clone(),
+			fallback_names: Vec::new(),
+			max_request_size: 1024,
+			max_response_size: 1024 * 1024,
+			request_timeout: REQUEST_TIMEOUT, // <-- important for the test
+			inbound_queue: None,
+		};
+
+		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
+		let (mut first_swarm, _) = {
+			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
+
+			tokio::spawn(async move {
+				if let Some(rq) = rx.next().await {
+					assert_eq!(rq.payload, b"this is a request");
+
+					// Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than
+					// `REQUEST_TIMEOUT`.
+					tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
+
+					// By the time the response is sent back, the second swarm
+					// received Timeout.
+					let _ = rq.pending_response.send(super::OutgoingResponse {
+						result: Ok(b"Second swarm already timedout".to_vec()),
+						reputation_changes: Vec::new(),
+						sent_feedback: None,
+					});
+				}
+			});
+
+			let mut protocol_config = protocol_config.clone();
+			protocol_config.inbound_queue = Some(tx);
+
+			build_swarm(iter::once(protocol_config))
+		};
+
+		let (mut second_swarm, second_address) = {
+			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
+
+			tokio::spawn(async move {
+				while let Some(rq) = rx.next().await {
+					let _ = rq.pending_response.send(super::OutgoingResponse {
+						result: Ok(b"This is the response".to_vec()),
+						reputation_changes: Vec::new(),
+						sent_feedback: None,
+					});
+				}
+			});
+			let mut protocol_config = protocol_config.clone();
+			protocol_config.inbound_queue = Some(tx);
+
+			build_swarm(iter::once(protocol_config.clone()))
+		};
+		// Modify the second swarm to have a shorter timeout.
+		second_swarm
+			.behaviour_mut()
+			.protocols
+			.get_mut(&protocol_name)
+			.unwrap()
+			.request_timeout = REQUEST_TIMEOUT_SHORT;
+
+		// Ask first swarm to dial the second swarm.
+		{
+			Swarm::dial(&mut first_swarm, second_address).unwrap();
+		}
+
+		// Running the first swarm in the background until a `InboundRequest` event happens,
+		// which is a hint about the test having ended.
+		tokio::spawn(async move {
 			loop {
-				match swarm.select_next_some().await {
-					SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
-						result.unwrap();
-						break
+				let event = first_swarm.select_next_some().await;
+				match event {
+					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
+						assert!(result.is_ok());
+						break;
+					},
+					SwarmEvent::ConnectionClosed { .. } => {
+						break;
 					},
 					_ => {},
 				}
 			}
-			assert_eq!(
-				response_receiver.await.unwrap().unwrap(),
-				(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
-			);
 		});
+
+		// Run the second swarm.
+		// - on connection established send the request to the first swarm
+		// - expect to receive a timeout
+		let mut response_receiver = None;
+		loop {
+			let event = second_swarm.select_next_some().await;
+
+			match event {
+				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
+					let (sender, receiver) = oneshot::channel();
+					second_swarm.behaviour_mut().send_request(
+						&peer_id,
+						protocol_name.clone(),
+						b"this is a request".to_vec(),
+						None,
+						sender,
+						IfDisconnected::ImmediateError,
+					);
+					assert!(response_receiver.is_none());
+					response_receiver = Some(receiver);
+				},
+				SwarmEvent::ConnectionClosed { .. } => {
+					break;
+				},
+				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
+					assert!(result.is_err());
+					break
+				},
+				_ => {},
+			}
+		}
+
+		// Expect the timeout.
+		match response_receiver.unwrap().await.unwrap().unwrap_err() {
+			RequestFailure::Network(OutboundFailure::Timeout) => {},
+			request_failure => panic!("Unexpected failure: {request_failure:?}"),
+		}
 	}
 }
-- 
GitLab