Unverified Commit 95df77e0 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Remove the streamunordered crate (#3339)

The functionality is now provided by the `futures` crate.
parent 70a4469d
Pipeline #143567 canceled with stages
in 5 minutes and 31 seconds
...@@ -6385,7 +6385,6 @@ dependencies = [ ...@@ -6385,7 +6385,6 @@ dependencies = [
"sp-application-crypto", "sp-application-crypto",
"sp-core", "sp-core",
"sp-keystore", "sp-keystore",
"streamunordered",
"substrate-prometheus-endpoint", "substrate-prometheus-endpoint",
"thiserror", "thiserror",
"tracing", "tracing",
...@@ -10052,18 +10051,6 @@ dependencies = [ ...@@ -10052,18 +10051,6 @@ dependencies = [
"generic-array 0.14.4", "generic-array 0.14.4",
] ]
[[package]]
name = "streamunordered"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9394ee1338fee8370bee649f8a7170b3a56917903a0956467ad192dcf8699ca"
dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"slab",
]
[[package]] [[package]]
name = "string" name = "string"
version = "0.2.1" version = "0.2.1"
......
...@@ -14,7 +14,6 @@ parity-scale-codec = { version = "2.0.0", default-features = false, features = [ ...@@ -14,7 +14,6 @@ parity-scale-codec = { version = "2.0.0", default-features = false, features = [
parking_lot = { version = "0.11.1", optional = true } parking_lot = { version = "0.11.1", optional = true }
pin-project = "1.0.7" pin-project = "1.0.7"
rand = "0.8.3" rand = "0.8.3"
streamunordered = "0.5.1"
thiserror = "1.0.23" thiserror = "1.0.23"
tracing = "0.1.26" tracing = "0.1.26"
lru = "0.6.5" lru = "0.6.5"
......
...@@ -31,7 +31,7 @@ use polkadot_node_subsystem::{ ...@@ -31,7 +31,7 @@ use polkadot_node_subsystem::{
ActiveLeavesUpdate, OverseerSignal, ActiveLeavesUpdate, OverseerSignal,
}; };
use polkadot_node_jaeger as jaeger; use polkadot_node_jaeger as jaeger;
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream}; use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::{Stream, SelectAll}};
use futures_timer::Delay; use futures_timer::Delay;
use parity_scale_codec::Encode; use parity_scale_codec::Encode;
use pin_project::pin_project; use pin_project::pin_project;
...@@ -48,7 +48,6 @@ use std::{ ...@@ -48,7 +48,6 @@ use std::{
collections::{HashMap, hash_map::Entry}, convert::TryFrom, marker::Unpin, pin::Pin, task::{Poll, Context}, collections::{HashMap, hash_map::Entry}, convert::TryFrom, marker::Unpin, pin::Pin, task::{Poll, Context},
time::Duration, fmt, sync::Arc, time::Duration, fmt, sync::Arc,
}; };
use streamunordered::{StreamUnordered, StreamYield};
use thiserror::Error; use thiserror::Error;
pub use metered_channel as metered; pub use metered_channel as metered;
...@@ -523,7 +522,7 @@ pub enum JobsError<JobError: std::fmt::Debug + std::error::Error + 'static> { ...@@ -523,7 +522,7 @@ pub enum JobsError<JobError: std::fmt::Debug + std::error::Error + 'static> {
struct Jobs<Spawner, ToJob> { struct Jobs<Spawner, ToJob> {
spawner: Spawner, spawner: Spawner,
running: HashMap<Hash, JobHandle<ToJob>>, running: HashMap<Hash, JobHandle<ToJob>>,
outgoing_msgs: StreamUnordered<mpsc::Receiver<FromJobCommand>>, outgoing_msgs: SelectAll<mpsc::Receiver<FromJobCommand>>,
} }
impl<Spawner: SpawnNamed, ToJob: Send + 'static> Jobs<Spawner, ToJob> { impl<Spawner: SpawnNamed, ToJob: Send + 'static> Jobs<Spawner, ToJob> {
...@@ -532,7 +531,7 @@ impl<Spawner: SpawnNamed, ToJob: Send + 'static> Jobs<Spawner, ToJob> { ...@@ -532,7 +531,7 @@ impl<Spawner: SpawnNamed, ToJob: Send + 'static> Jobs<Spawner, ToJob> {
Self { Self {
spawner, spawner,
running: HashMap::new(), running: HashMap::new(),
outgoing_msgs: StreamUnordered::new(), outgoing_msgs: SelectAll::new(),
} }
} }
...@@ -608,17 +607,10 @@ where ...@@ -608,17 +607,10 @@ where
type Item = FromJobCommand; type Item = FromJobCommand;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop { match futures::ready!(Pin::new(&mut self.outgoing_msgs).poll_next(cx)) {
match Pin::new(&mut self.outgoing_msgs).poll_next(cx) { Some(msg) => Poll::Ready(Some(msg)),
Poll::Pending => return Poll::Pending, // Don't end if there are no jobs running
Poll::Ready(r) => match r.map(|v| v.0) { None => Poll::Pending,
Some(StreamYield::Item(msg)) => return Poll::Ready(Some(msg)),
// If a job is finished, rerun the loop
Some(StreamYield::Finished(_)) => continue,
// Don't end if there are no jobs running
None => return Poll::Pending,
}
}
} }
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment