diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 7a730aa8cfdf8ed29c49916f8f5419a3ad051b1c..85f10c68a74c068ffc62a00b7d5d7eaa59e6f887 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -191,7 +191,7 @@ impl ProvisioningJob { }; loop { futures::select! { - msg = self.receiver.next().fuse() => match msg { + msg = self.receiver.next() => match msg { Some(RequestInherentData(_, return_sender)) => { let _span = span.child("req-inherent-data"); let _timer = self.metrics.time_request_inherent_data(); diff --git a/polkadot/node/metered-channel/src/bounded.rs b/polkadot/node/metered-channel/src/bounded.rs index 38aa6f15c65fc19d47a8f8afaebf1c55cde927ca..43a77f707fc0a2713ae3e51a2a2782ffeeb636ac 100644 --- a/polkadot/node/metered-channel/src/bounded.rs +++ b/polkadot/node/metered-channel/src/bounded.rs @@ -153,34 +153,3 @@ impl<T> MeteredSender<T> { }) } } - -impl<T> futures::sink::Sink<T> for MeteredSender<T> { - type Error = mpsc::SendError; - - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - Pin::new(&mut self.inner).start_send(item) - } - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Pin::new(&mut self.inner).poll_ready(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match Pin::new(&mut self.inner).poll_close(cx) { - val @ Poll::Ready(_)=> { - val - } - other => other, - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match Pin::new(&mut self.inner).poll_flush(cx) { - val @ Poll::Ready(_)=> { - self.meter.note_sent(); - val - } - other => other, - } - } -} diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 399b6e6c26d0ed925911ce428a77757a4bf86fa4..87c6fd9494b04bce8e259d11a24a78a455c26925 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -830,7 +830,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>( } } }, - req_res_event = request_multiplexer.next().fuse() => match req_res_event { + req_res_event = request_multiplexer.next() => match req_res_event { None => return Err(UnexpectedAbort::RequestStreamConcluded), Some(Err(err)) => { network_service.report_peer(err.peer, MALFORMED_MESSAGE_COST); diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index b0b2d8d8f859059de03ebf12446b469f8e06e811..59c9cf249ba05fb683908e99e80c4ef1f6703747 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -595,14 +595,14 @@ impl Message { // We are only fusing here to make `select` happy, in reality we will quit if one of those // streams end: let from_overseer = ctx.recv().fuse(); - let from_requester = from_requester.next().fuse(); - let from_responder = from_responder.next().fuse(); + let from_requester = from_requester.next(); + let from_responder = from_responder.next(); futures::pin_mut!(from_overseer, from_requester, from_responder); - futures::select!( + futures::select! { msg = from_overseer => Message::Subsystem(msg.map_err(Fatal::SubsystemReceive)), msg = from_requester => Message::Requester(msg), msg = from_responder => Message::Responder(msg), - ) + } } } diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 7e6f831dd70ae8b3ae17882845228cb6b409284b..06a720e85fdcf34d52922dd38cfca8a555cd4caf 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -909,8 +909,8 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> { } } - let mut await_message = self.messages.next().fuse(); - let mut await_signal = self.signals.next().fuse(); + let mut await_message = self.messages.next(); + let mut await_signal = self.signals.next(); let signals_received = self.signals_received.load(); let pending_incoming = &mut self.pending_incoming; @@ -1901,13 +1901,7 @@ where loop { select! { - msg = self.events_rx.next().fuse() => { - let msg = if let Some(msg) = msg { - msg - } else { - continue - }; - + msg = self.events_rx.select_next_some() => { match msg { Event::MsgToSubsystem(msg) => { self.route_message(msg.into()).await?; @@ -1927,16 +1921,7 @@ where } } }, - msg = self.to_overseer_rx.next() => { - let msg = match msg { - Some(m) => m, - None => { - // This is a fused stream so we will shut down after receiving all - // shutdown notifications. - continue - } - }; - + msg = self.to_overseer_rx.select_next_some() => { match msg { ToOverseer::SpawnJob { name, s } => { self.spawn_job(name, s); @@ -1946,16 +1931,14 @@ where } } }, - res = self.running_subsystems.next().fuse() => { - let finished = if let Some(finished) = res { - finished - } else { - continue - }; - - tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly"); + res = self.running_subsystems.select_next_some() => { + tracing::error!( + target: LOG_TARGET, + subsystem = ?res, + "subsystem finished unexpectedly", + ); self.stop().await; - return finished; + return res; }, } }