From 6fa4a0e3c7920832c5095ce52660fa71cdbd0523 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov <vsevolod.stakhov@parity.io> Date: Fri, 27 May 2022 17:17:33 +0100 Subject: [PATCH] Check unbounded channel first when polling for subsystem mesages (#5566) * Prefer unbounded channel when selecting rx events * Fix tests * Forgotten fmt recursion * Extract strategy functor to allow easier modifications --- .../node/orchestra/proc-macro/src/impl_builder.rs | 7 ++++--- polkadot/node/orchestra/src/lib.rs | 11 +++++++++-- polkadot/node/overseer/src/tests.rs | 6 +++++- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/polkadot/node/orchestra/proc-macro/src/impl_builder.rs b/polkadot/node/orchestra/proc-macro/src/impl_builder.rs index d5a91782e8f..cf9f7551ca7 100644 --- a/polkadot/node/orchestra/proc-macro/src/impl_builder.rs +++ b/polkadot/node/orchestra/proc-macro/src/impl_builder.rs @@ -605,9 +605,10 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream { }; let unbounded_meter = #channel_name_unbounded_rx.meter().clone(); - - let message_rx: SubsystemIncomingMessages< #consumes > = #support_crate ::select( - #channel_name_rx, #channel_name_unbounded_rx + // Prefer unbounded channel when selecting + let message_rx: SubsystemIncomingMessages< #consumes > = #support_crate ::select_with_strategy( + #channel_name_rx, #channel_name_unbounded_rx, + #support_crate ::select_message_channel_strategy ); let (signal_tx, signal_rx) = #support_crate ::metered::channel( self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY) diff --git a/polkadot/node/orchestra/src/lib.rs b/polkadot/node/orchestra/src/lib.rs index 3f84207de6b..15bd661e720 100644 --- a/polkadot/node/orchestra/src/lib.rs +++ b/polkadot/node/orchestra/src/lib.rs @@ -76,7 +76,7 @@ pub use futures::{ channel::{mpsc, oneshot}, future::{BoxFuture, Fuse, Future}, poll, select, - stream::{self, select, FuturesUnordered}, + stream::{self, select, select_with_strategy, FuturesUnordered, PollNext}, task::{Context, Poll}, FutureExt, StreamExt, }; @@ -203,10 +203,17 @@ pub fn make_packet<T>(signals_received: usize, message: T) -> MessagePacket<T> { MessagePacket { signals_received, message } } +/// A functor to specify strategy of the channels selection in the `SubsystemIncomingMessages` +pub fn select_message_channel_strategy(_: &mut ()) -> PollNext { + PollNext::Right +} + /// Incoming messages from both the bounded and unbounded channel. -pub type SubsystemIncomingMessages<M> = self::stream::Select< +pub type SubsystemIncomingMessages<M> = self::stream::SelectWithStrategy< self::metered::MeteredReceiver<MessagePacket<M>>, self::metered::UnboundedMeteredReceiver<MessagePacket<M>>, + fn(&mut ()) -> self::stream::PollNext, + (), >; /// Watermark to track the received signals. diff --git a/polkadot/node/overseer/src/tests.rs b/polkadot/node/overseer/src/tests.rs index beff15e347c..a3f30446662 100644 --- a/polkadot/node/overseer/src/tests.rs +++ b/polkadot/node/overseer/src/tests.rs @@ -1129,7 +1129,11 @@ fn context_holds_onto_message_until_enough_signals_received() { let mut ctx = OverseerSubsystemContext::new( signal_rx, - stream::select(bounded_rx, unbounded_rx), + stream::select_with_strategy( + bounded_rx, + unbounded_rx, + orchestra::select_message_channel_strategy, + ), channels_out, to_overseer_tx, "test", -- GitLab