From 7b054b3c77038a21e6e6f06d7637fa08e3e0f170 Mon Sep 17 00:00:00 2001 From: Andronik Ordian <write@reusable.software> Date: Fri, 2 Jul 2021 10:23:26 +0200 Subject: [PATCH] cleanup stream polls (#3397) * metered-channel: remove dead code * we don't need no fuse * even more --- polkadot/node/core/provisioner/src/lib.rs | 2 +- polkadot/node/metered-channel/src/bounded.rs | 31 --------------- polkadot/node/network/bridge/src/lib.rs | 2 +- .../network/statement-distribution/src/lib.rs | 8 ++-- polkadot/node/overseer/src/lib.rs | 39 ++++++------------- 5 files changed, 17 insertions(+), 65 deletions(-) diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 7a730aa8cfd..85f10c68a74 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -191,7 +191,7 @@ impl ProvisioningJob { }; loop { futures::select! { - msg = self.receiver.next().fuse() => match msg { + msg = self.receiver.next() => match msg { Some(RequestInherentData(_, return_sender)) => { let _span = span.child("req-inherent-data"); let _timer = self.metrics.time_request_inherent_data(); diff --git a/polkadot/node/metered-channel/src/bounded.rs b/polkadot/node/metered-channel/src/bounded.rs index 38aa6f15c65..43a77f707fc 100644 --- a/polkadot/node/metered-channel/src/bounded.rs +++ b/polkadot/node/metered-channel/src/bounded.rs @@ -153,34 +153,3 @@ impl<T> MeteredSender<T> { }) } } - -impl<T> futures::sink::Sink<T> for MeteredSender<T> { - type Error = mpsc::SendError; - - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - Pin::new(&mut self.inner).start_send(item) - } - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Pin::new(&mut self.inner).poll_ready(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match Pin::new(&mut self.inner).poll_close(cx) { - val @ Poll::Ready(_)=> { - val - } - other => other, - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match Pin::new(&mut self.inner).poll_flush(cx) { - val @ Poll::Ready(_)=> { - self.meter.note_sent(); - val - } - other => other, - } - } -} diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 399b6e6c26d..87c6fd9494b 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -830,7 +830,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>( } } }, - req_res_event = request_multiplexer.next().fuse() => match req_res_event { + req_res_event = request_multiplexer.next() => match req_res_event { None => return Err(UnexpectedAbort::RequestStreamConcluded), Some(Err(err)) => { network_service.report_peer(err.peer, MALFORMED_MESSAGE_COST); diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index b0b2d8d8f85..59c9cf249ba 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -595,14 +595,14 @@ impl Message { // We are only fusing here to make `select` happy, in reality we will quit if one of those // streams end: let from_overseer = ctx.recv().fuse(); - let from_requester = from_requester.next().fuse(); - let from_responder = from_responder.next().fuse(); + let from_requester = from_requester.next(); + let from_responder = from_responder.next(); futures::pin_mut!(from_overseer, from_requester, from_responder); - futures::select!( + futures::select! { msg = from_overseer => Message::Subsystem(msg.map_err(Fatal::SubsystemReceive)), msg = from_requester => Message::Requester(msg), msg = from_responder => Message::Responder(msg), - ) + } } } diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 7e6f831dd70..06a720e85fd 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -909,8 +909,8 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> { } } - let mut await_message = self.messages.next().fuse(); - let mut await_signal = self.signals.next().fuse(); + let mut await_message = self.messages.next(); + let mut await_signal = self.signals.next(); let signals_received = self.signals_received.load(); let pending_incoming = &mut self.pending_incoming; @@ -1901,13 +1901,7 @@ where loop { select! { - msg = self.events_rx.next().fuse() => { - let msg = if let Some(msg) = msg { - msg - } else { - continue - }; - + msg = self.events_rx.select_next_some() => { match msg { Event::MsgToSubsystem(msg) => { self.route_message(msg.into()).await?; @@ -1927,16 +1921,7 @@ where } } }, - msg = self.to_overseer_rx.next() => { - let msg = match msg { - Some(m) => m, - None => { - // This is a fused stream so we will shut down after receiving all - // shutdown notifications. - continue - } - }; - + msg = self.to_overseer_rx.select_next_some() => { match msg { ToOverseer::SpawnJob { name, s } => { self.spawn_job(name, s); @@ -1946,16 +1931,14 @@ where } } }, - res = self.running_subsystems.next().fuse() => { - let finished = if let Some(finished) = res { - finished - } else { - continue - }; - - tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly"); + res = self.running_subsystems.select_next_some() => { + tracing::error!( + target: LOG_TARGET, + subsystem = ?res, + "subsystem finished unexpectedly", + ); self.stop().await; - return finished; + return res; }, } } -- GitLab