Unverified Commit ddd26fa9 authored by Fedor Sakharov's avatar Fedor Sakharov
Browse files

Fuse receive stream in Context

parent 05b91fb4
Pipeline #119283 canceled with stages
in 14 minutes and 39 seconds
......@@ -70,7 +70,7 @@ use futures::channel::{mpsc, oneshot};
use futures::{
poll, select,
future::BoxFuture,
stream::{self, FuturesUnordered},
stream::{self, Fuse, FusedStream, FuturesUnordered},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
......@@ -323,7 +323,7 @@ impl<T> From<T> for MaybeTimed<T> {
/// [`SubsystemJob`]: trait.SubsystemJob.html
#[derive(Debug)]
pub struct OverseerSubsystemContext<M>{
rx: mpsc::Receiver<FromOverseer<M>>,
rx: Fuse<mpsc::Receiver<FromOverseer<M>>>,
tx: mpsc::Sender<MaybeTimed<ToOverseer>>,
metrics: Metrics,
rng: Rand32,
......@@ -354,6 +354,7 @@ impl<M> OverseerSubsystemContext<M> {
}
let threshold = (capture_rate * u32::MAX as f64) as u32;
let rx = rx.fuse();
OverseerSubsystemContext { rx, tx, metrics, rng, threshold }
}
......@@ -415,11 +416,18 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
}
async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
self.rx.next().await
.ok_or(SubsystemError::Context(
"No more messages in rx queue to process"
if !self.rx.is_terminated() {
self.rx.next().await
.ok_or(SubsystemError::Context(
"No more messages in rx queue to process"
.to_owned()
))
} else {
Err(SubsystemError::Context(
"Node is shutting down"
.to_owned()
))
}
}
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment