Skip to content
Snippets Groups Projects
Commit 6fa4a0e3 authored by Vsevolod Stakhov's avatar Vsevolod Stakhov Committed by GitHub
Browse files

Check unbounded channel first when polling for subsystem mesages (#5566)

* Prefer unbounded channel when selecting rx events

* Fix tests

* Forgotten fmt recursion

* Extract strategy functor to allow easier modifications
parent 13900dfe
Branches
No related merge requests found
......@@ -605,9 +605,10 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream {
};
let unbounded_meter = #channel_name_unbounded_rx.meter().clone();
let message_rx: SubsystemIncomingMessages< #consumes > = #support_crate ::select(
#channel_name_rx, #channel_name_unbounded_rx
// Prefer unbounded channel when selecting
let message_rx: SubsystemIncomingMessages< #consumes > = #support_crate ::select_with_strategy(
#channel_name_rx, #channel_name_unbounded_rx,
#support_crate ::select_message_channel_strategy
);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(
self.signal_capacity.unwrap_or(SIGNAL_CHANNEL_CAPACITY)
......
......@@ -76,7 +76,7 @@ pub use futures::{
channel::{mpsc, oneshot},
future::{BoxFuture, Fuse, Future},
poll, select,
stream::{self, select, FuturesUnordered},
stream::{self, select, select_with_strategy, FuturesUnordered, PollNext},
task::{Context, Poll},
FutureExt, StreamExt,
};
......@@ -203,10 +203,17 @@ pub fn make_packet<T>(signals_received: usize, message: T) -> MessagePacket<T> {
MessagePacket { signals_received, message }
}
/// A functor to specify strategy of the channels selection in the `SubsystemIncomingMessages`
pub fn select_message_channel_strategy(_: &mut ()) -> PollNext {
PollNext::Right
}
/// Incoming messages from both the bounded and unbounded channel.
pub type SubsystemIncomingMessages<M> = self::stream::Select<
pub type SubsystemIncomingMessages<M> = self::stream::SelectWithStrategy<
self::metered::MeteredReceiver<MessagePacket<M>>,
self::metered::UnboundedMeteredReceiver<MessagePacket<M>>,
fn(&mut ()) -> self::stream::PollNext,
(),
>;
/// Watermark to track the received signals.
......
......@@ -1129,7 +1129,11 @@ fn context_holds_onto_message_until_enough_signals_received() {
let mut ctx = OverseerSubsystemContext::new(
signal_rx,
stream::select(bounded_rx, unbounded_rx),
stream::select_with_strategy(
bounded_rx,
unbounded_rx,
orchestra::select_message_channel_strategy,
),
channels_out,
to_overseer_tx,
"test",
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment