Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
parity
Mirrored projects
polkadot
Commits
e3676fdc
Unverified
Commit
e3676fdc
authored
Jan 13, 2021
by
Bernhard Schuster
Committed by
GitHub
Jan 13, 2021
Browse files
metered mpsc channels (#2235)
parent
4489b528
Pipeline
#120099
passed with stages
in 33 minutes and 21 seconds
Changes
11
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Cargo.lock
View file @
e3676fdc
...
...
@@ -3475,6 +3475,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "metered-channel"
version = "0.1.0"
dependencies = [
"assert_matches",
"futures 0.3.8",
"futures-timer 3.0.2",
]
[[package]]
name = "mick-jaeger"
version = "0.1.4"
...
...
@@ -5013,6 +5022,7 @@ dependencies = [
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sc-authority-discovery",
"sc-network",
...
...
@@ -5315,6 +5325,7 @@ dependencies = [
"futures 0.3.8",
"futures-timer 3.0.2",
"log",
"metered-channel",
"parity-scale-codec",
"parking_lot 0.11.1",
"pin-project 1.0.4",
...
...
Cargo.toml
View file @
e3676fdc
...
...
@@ -66,6 +66,7 @@ members = [
"node/subsystem-test-helpers"
,
"node/subsystem-util"
,
"node/jaeger"
,
"node/metered-channel"
,
"node/test/client"
,
"node/test/service"
,
"parachain/test-parachains"
,
...
...
node/metered-channel/Cargo.toml
0 → 100644
View file @
e3676fdc
[package]
name
=
"metered-channel"
version
=
"0.1.0"
authors
=
[
"Parity Technologies <admin@parity.io>"
]
edition
=
"2018"
description
=
"Channels with attached Meters"
[dependencies]
futures
=
"0.3.8"
futures-timer
=
"3.0.2"
[dev-dependencies]
assert_matches
=
"1.4.0"
futures
=
{
version
=
"0.3.8"
,
features
=
["thread-pool"]
}
node/metered-channel/src/bounded.rs
0 → 100644
View file @
e3676fdc
// Copyright 2017-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Metered variant of bounded mpsc channels to be able to extract metrics.
use
super
::
*
;
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub
fn
channel
<
T
>
(
capacity
:
usize
,
name
:
&
'static
str
)
->
(
MeteredSender
<
T
>
,
MeteredReceiver
<
T
>
)
{
let
(
tx
,
rx
)
=
mpsc
::
channel
(
capacity
);
let
mut
shared_meter
=
Meter
::
default
();
shared_meter
.name
=
name
;
let
tx
=
MeteredSender
{
meter
:
shared_meter
.clone
(),
inner
:
tx
};
let
rx
=
MeteredReceiver
{
meter
:
shared_meter
,
inner
:
rx
};
(
tx
,
rx
)
}
/// A receiver tracking the messages consumed by itself.
#[derive(Debug)]
pub
struct
MeteredReceiver
<
T
>
{
// count currently contained messages
meter
:
Meter
,
inner
:
mpsc
::
Receiver
<
T
>
,
}
impl
<
T
>
std
::
ops
::
Deref
for
MeteredReceiver
<
T
>
{
type
Target
=
mpsc
::
Receiver
<
T
>
;
fn
deref
(
&
self
)
->
&
Self
::
Target
{
&
self
.inner
}
}
impl
<
T
>
std
::
ops
::
DerefMut
for
MeteredReceiver
<
T
>
{
fn
deref_mut
(
&
mut
self
)
->
&
mut
Self
::
Target
{
&
mut
self
.inner
}
}
impl
<
T
>
Stream
for
MeteredReceiver
<
T
>
{
type
Item
=
T
;
fn
poll_next
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<
'_
>
)
->
Poll
<
Option
<
Self
::
Item
>>
{
match
mpsc
::
Receiver
::
poll_next
(
Pin
::
new
(
&
mut
self
.inner
),
cx
)
{
Poll
::
Ready
(
x
)
=>
{
// always use Ordering::SeqCst to avoid underflows
self
.meter.fill
.fetch_sub
(
1
,
Ordering
::
SeqCst
);
Poll
::
Ready
(
x
)
}
other
=>
other
,
}
}
/// Don't rely on the unreliable size hint.
fn
size_hint
(
&
self
)
->
(
usize
,
Option
<
usize
>
)
{
self
.inner
.size_hint
()
}
}
impl
<
T
>
MeteredReceiver
<
T
>
{
/// Get an updated accessor object for all metrics collected.
pub
fn
meter
(
&
self
)
->
&
Meter
{
&
self
.meter
}
/// Attempt to receive the next item.
pub
fn
try_next
(
&
mut
self
)
->
Result
<
Option
<
T
>
,
mpsc
::
TryRecvError
>
{
match
self
.inner
.try_next
()
?
{
Some
(
x
)
=>
{
self
.meter.fill
.fetch_sub
(
1
,
Ordering
::
SeqCst
);
Ok
(
Some
(
x
))
}
None
=>
Ok
(
None
),
}
}
}
impl
<
T
>
futures
::
stream
::
FusedStream
for
MeteredReceiver
<
T
>
{
fn
is_terminated
(
&
self
)
->
bool
{
self
.inner
.is_terminated
()
}
}
/// The sender component, tracking the number of items
/// sent across it.
#[derive(Debug)]
pub
struct
MeteredSender
<
T
>
{
meter
:
Meter
,
inner
:
mpsc
::
Sender
<
T
>
,
}
impl
<
T
>
Clone
for
MeteredSender
<
T
>
{
fn
clone
(
&
self
)
->
Self
{
Self
{
meter
:
self
.meter
.clone
(),
inner
:
self
.inner
.clone
()
}
}
}
impl
<
T
>
std
::
ops
::
Deref
for
MeteredSender
<
T
>
{
type
Target
=
mpsc
::
Sender
<
T
>
;
fn
deref
(
&
self
)
->
&
Self
::
Target
{
&
self
.inner
}
}
impl
<
T
>
std
::
ops
::
DerefMut
for
MeteredSender
<
T
>
{
fn
deref_mut
(
&
mut
self
)
->
&
mut
Self
::
Target
{
&
mut
self
.inner
}
}
impl
<
T
>
MeteredSender
<
T
>
{
/// Get an updated accessor object for all metrics collected.
pub
fn
meter
(
&
self
)
->
&
Meter
{
&
self
.meter
}
/// Send message, wait until capacity is available.
pub
async
fn
send
(
&
mut
self
,
item
:
T
)
->
result
::
Result
<
(),
mpsc
::
SendError
>
where
Self
:
Unpin
,
{
self
.meter.fill
.fetch_add
(
1
,
Ordering
::
SeqCst
);
let
fut
=
self
.inner
.send
(
item
);
futures
::
pin_mut!
(
fut
);
fut
.await
}
/// Attempt to send message or fail immediately.
pub
fn
try_send
(
&
mut
self
,
msg
:
T
)
->
result
::
Result
<
(),
mpsc
::
TrySendError
<
T
>>
{
self
.inner
.try_send
(
msg
)
?
;
self
.meter.fill
.fetch_add
(
1
,
Ordering
::
SeqCst
);
Ok
(())
}
}
impl
<
T
>
futures
::
sink
::
Sink
<
T
>
for
MeteredSender
<
T
>
{
type
Error
=
mpsc
::
SendError
;
fn
start_send
(
mut
self
:
Pin
<&
mut
Self
>
,
item
:
T
)
->
Result
<
(),
Self
::
Error
>
{
Pin
::
new
(
&
mut
self
.inner
)
.start_send
(
item
)
}
fn
poll_ready
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<
'_
>
)
->
Poll
<
Result
<
(),
Self
::
Error
>>
{
Pin
::
new
(
&
mut
self
.inner
)
.poll_ready
(
cx
)
}
fn
poll_close
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<
'_
>
)
->
Poll
<
Result
<
(),
Self
::
Error
>>
{
match
Pin
::
new
(
&
mut
self
.inner
)
.poll_close
(
cx
)
{
val
@
Poll
::
Ready
(
_
)
=>
{
self
.meter.fill
.store
(
0
,
Ordering
::
SeqCst
);
val
}
other
=>
other
,
}
}
fn
poll_flush
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<
'_
>
)
->
Poll
<
Result
<
(),
Self
::
Error
>>
{
match
Pin
::
new
(
&
mut
self
.inner
)
.poll_flush
(
cx
)
{
val
@
Poll
::
Ready
(
_
)
=>
{
self
.meter.fill
.fetch_add
(
1
,
Ordering
::
SeqCst
);
val
}
other
=>
other
,
}
}
}
node/metered-channel/src/lib.rs
0 → 100644
View file @
e3676fdc
// Copyright 2017-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Metered variant of mpsc channels to be able to extract metrics.
use
std
::
sync
::
atomic
::{
AtomicUsize
,
Ordering
};
use
futures
::{
channel
::
mpsc
,
task
::
Poll
,
task
::
Context
,
sink
::
SinkExt
,
stream
::
Stream
};
use
std
::
result
;
use
std
::
sync
::
Arc
;
use
std
::
pin
::
Pin
;
mod
bounded
;
mod
unbounded
;
pub
use
self
::
bounded
::
*
;
pub
use
self
::
unbounded
::
*
;
/// A peek into the inner state of a meter.
#[derive(Debug,
Clone,
Default)]
pub
struct
Meter
{
/// Name of the receiver and sender pair.
name
:
&
'static
str
,
// fill state of the channel
fill
:
Arc
<
AtomicUsize
>
,
}
impl
Meter
{
/// Count the number of items queued up inside the channel.
pub
fn
queue_count
(
&
self
)
->
usize
{
// when obtaining we don't care much about off by one
// accuracy
self
.fill
.load
(
Ordering
::
Relaxed
)
}
/// Obtain the name of the channel `Sender` and `Receiver` pair.
pub
fn
name
(
&
self
)
->
&
'static
str
{
self
.name
}
}
#[cfg(test)]
mod
tests
{
use
super
::
*
;
use
futures
::
executor
::
block_on
;
use
futures
::
StreamExt
;
#[derive(Clone,
Copy,
Debug,
Default)]
struct
Msg
{
val
:
u8
,
}
#[test]
fn
try_send_try_next
()
{
block_on
(
async
move
{
let
(
mut
tx
,
mut
rx
)
=
channel
::
<
Msg
>
(
5
,
"goofy"
);
let
msg
=
Msg
::
default
();
assert_eq!
(
rx
.meter
()
.queue_count
(),
0
);
tx
.try_send
(
msg
)
.unwrap
();
assert_eq!
(
tx
.meter
()
.queue_count
(),
1
);
tx
.try_send
(
msg
)
.unwrap
();
tx
.try_send
(
msg
)
.unwrap
();
tx
.try_send
(
msg
)
.unwrap
();
assert_eq!
(
tx
.meter
()
.queue_count
(),
4
);
rx
.try_next
()
.unwrap
();
assert_eq!
(
rx
.meter
()
.queue_count
(),
3
);
rx
.try_next
()
.unwrap
();
rx
.try_next
()
.unwrap
();
assert_eq!
(
tx
.meter
()
.queue_count
(),
1
);
rx
.try_next
()
.unwrap
();
assert_eq!
(
rx
.meter
()
.queue_count
(),
0
);
assert!
(
rx
.try_next
()
.is_err
());
});
}
#[test]
fn
with_tasks
()
{
let
(
ready
,
go
)
=
futures
::
channel
::
oneshot
::
channel
();
let
(
mut
tx
,
mut
rx
)
=
channel
::
<
Msg
>
(
5
,
"goofy"
);
block_on
(
async
move
{
futures
::
join!
(
async
move
{
let
msg
=
Msg
::
default
();
assert_eq!
(
tx
.meter
()
.queue_count
(),
0
);
tx
.try_send
(
msg
)
.unwrap
();
assert_eq!
(
tx
.meter
()
.queue_count
(),
1
);
tx
.try_send
(
msg
)
.unwrap
();
tx
.try_send
(
msg
)
.unwrap
();
tx
.try_send
(
msg
)
.unwrap
();
ready
.send
(())
.expect
(
"Helper oneshot channel must work. qed"
);
},
async
move
{
go
.await
.expect
(
"Helper oneshot channel must work. qed"
);
assert_eq!
(
rx
.meter
()
.queue_count
(),
4
);
rx
.try_next
()
.unwrap
();
assert_eq!
(
rx
.meter
()
.queue_count
(),
3
);
rx
.try_next
()
.unwrap
();
rx
.try_next
()
.unwrap
();
assert_eq!
(
rx
.meter
()
.queue_count
(),
1
);
rx
.try_next
()
.unwrap
();
assert_eq!
(
dbg!
(
rx
.meter
()
.queue_count
()),
0
);
}
)
});
}
use
std
::
time
::
Duration
;
use
futures_timer
::
Delay
;
#[test]
fn
stream_and_sink
()
{
let
(
mut
tx
,
mut
rx
)
=
channel
::
<
Msg
>
(
5
,
"goofy"
);
block_on
(
async
move
{
futures
::
join!
(
async
move
{
for
i
in
0
..
15
{
println!
(
"Sent #{} with a backlog of {} items"
,
i
+
1
,
tx
.meter
()
.queue_count
());
let
msg
=
Msg
{
val
:
i
as
u8
+
1u8
};
tx
.send
(
msg
)
.await
.unwrap
();
assert!
(
tx
.meter
()
.queue_count
()
>
0usize
);
Delay
::
new
(
Duration
::
from_millis
(
20
))
.await
;
}
()
},
async
move
{
while
let
Some
(
msg
)
=
rx
.next
()
.await
{
println!
(
"rx'd one {} with {} backlogged"
,
msg
.val
,
rx
.meter
()
.queue_count
());
Delay
::
new
(
Duration
::
from_millis
(
29
))
.await
;
}
}
)
});
}
}
node/metered-channel/src/unbounded.rs
0 → 100644
View file @
e3676fdc
// Copyright 2017-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Metered variant of unbounded mpsc channels to be able to extract metrics.
use
super
::
*
;
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub
fn
unbounded
<
T
>
(
name
:
&
'static
str
)
->
(
UnboundedMeteredSender
<
T
>
,
UnboundedMeteredReceiver
<
T
>
)
{
let
(
tx
,
rx
)
=
mpsc
::
unbounded
();
let
mut
shared_meter
=
Meter
::
default
();
shared_meter
.name
=
name
;
let
tx
=
UnboundedMeteredSender
{
meter
:
shared_meter
.clone
(),
inner
:
tx
};
let
rx
=
UnboundedMeteredReceiver
{
meter
:
shared_meter
,
inner
:
rx
};
(
tx
,
rx
)
}
/// A receiver tracking the messages consumed by itself.
#[derive(Debug)]
pub
struct
UnboundedMeteredReceiver
<
T
>
{
// count currently contained messages
meter
:
Meter
,
inner
:
mpsc
::
UnboundedReceiver
<
T
>
,
}
impl
<
T
>
std
::
ops
::
Deref
for
UnboundedMeteredReceiver
<
T
>
{
type
Target
=
mpsc
::
UnboundedReceiver
<
T
>
;
fn
deref
(
&
self
)
->
&
Self
::
Target
{
&
self
.inner
}
}
impl
<
T
>
std
::
ops
::
DerefMut
for
UnboundedMeteredReceiver
<
T
>
{
fn
deref_mut
(
&
mut
self
)
->
&
mut
Self
::
Target
{
&
mut
self
.inner
}
}
impl
<
T
>
Stream
for
UnboundedMeteredReceiver
<
T
>
{
type
Item
=
T
;
fn
poll_next
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<
'_
>
)
->
Poll
<
Option
<
Self
::
Item
>>
{
match
mpsc
::
UnboundedReceiver
::
poll_next
(
Pin
::
new
(
&
mut
self
.inner
),
cx
)
{
Poll
::
Ready
(
x
)
=>
{
// always use Ordering::SeqCst to avoid underflows
self
.meter.fill
.fetch_sub
(
1
,
Ordering
::
SeqCst
);
Poll
::
Ready
(
x
)
}
other
=>
other
,
}
}
/// Don't rely on the unreliable size hint.
fn
size_hint
(
&
self
)
->
(
usize
,
Option
<
usize
>
)
{
self
.inner
.size_hint
()
}
}
impl
<
T
>
UnboundedMeteredReceiver
<
T
>
{
/// Get an updated accessor object for all metrics collected.
pub
fn
meter
(
&
self
)
->
&
Meter
{
&
self
.meter
}
/// Attempt to receive the next item.
pub
fn
try_next
(
&
mut
self
)
->
Result
<
Option
<
T
>
,
mpsc
::
TryRecvError
>
{
match
self
.inner
.try_next
()
?
{
Some
(
x
)
=>
{
self
.meter.fill
.fetch_sub
(
1
,
Ordering
::
SeqCst
);
Ok
(
Some
(
x
))
}
None
=>
Ok
(
None
),
}
}
}
impl
<
T
>
futures
::
stream
::
FusedStream
for
UnboundedMeteredReceiver
<
T
>
{
fn
is_terminated
(
&
self
)
->
bool
{
self
.inner
.is_terminated
()
}
}
/// The sender component, tracking the number of items
/// sent across it.
#[derive(Debug)]
pub
struct
UnboundedMeteredSender
<
T
>
{
meter
:
Meter
,
inner
:
mpsc
::
UnboundedSender
<
T
>
,
}
impl
<
T
>
Clone
for
UnboundedMeteredSender
<
T
>
{
fn
clone
(
&
self
)
->
Self
{
Self
{
meter
:
self
.meter
.clone
(),
inner
:
self
.inner
.clone
()
}
}
}
impl
<
T
>
std
::
ops
::
Deref
for
UnboundedMeteredSender
<
T
>
{
type
Target
=
mpsc
::
UnboundedSender
<
T
>
;
fn
deref
(
&
self
)
->
&
Self
::
Target
{
&
self
.inner
}
}
impl
<
T
>
std
::
ops
::
DerefMut
for
UnboundedMeteredSender
<
T
>
{
fn
deref_mut
(
&
mut
self
)
->
&
mut
Self
::
Target
{
&
mut
self
.inner
}
}
impl
<
T
>
UnboundedMeteredSender
<
T
>
{
/// Get an updated accessor object for all metrics collected.
pub
fn
meter
(
&
self
)
->
&
Meter
{
&
self
.meter
}
/// Send message, wait until capacity is available.
pub
async
fn
send
(
&
mut
self
,
item
:
T
)
->
result
::
Result
<
(),
mpsc
::
SendError
>
where
Self
:
Unpin
,
{
self
.meter.fill
.fetch_add
(
1
,
Ordering
::
SeqCst
);
let
fut
=
self
.inner
.send
(
item
);
futures
::
pin_mut!
(
fut
);
fut
.await
}
/// Attempt to send message or fail immediately.
pub
fn
unbounded_send
(
&
mut
self
,
msg
:
T
)
->
result
::
Result
<
(),
mpsc
::
TrySendError
<
T
>>
{
self
.inner
.unbounded_send
(
msg
)
.expect
(
"Unbounded send never fails. qed"
);
self
.meter.fill
.fetch_add
(
1
,
Ordering
::
SeqCst
);
Ok
(())
}
}
impl
<
T
>
futures
::
sink
::
Sink
<
T
>
for
UnboundedMeteredSender
<
T
>
{
type
Error
=
<
futures
::
channel
::
mpsc
::
UnboundedSender
<
T
>
as
futures
::
sink
::
Sink
<
T
>>
::
Error
;
fn
start_send
(
mut
self
:
Pin
<&
mut
Self
>
,
item
:
T
)
->
Result
<
(),
Self
::
Error
>
{
Pin
::
new
(
&
mut
self
.inner
)
.start_send
(
item
)
}
fn
poll_ready
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<
'_
>
)
->
Poll
<
Result
<
(),
Self
::
Error
>>
{
Pin
::
new
(
&
mut
self
.inner
)
.poll_ready
(
cx
)
}
fn
poll_close
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<
'_
>
)
->
Poll
<
Result
<
(),
Self
::
Error
>>
{
match
Pin
::
new
(
&
mut
self
.inner
)
.poll_ready
(
cx
)
{
val
@
Poll
::
Ready
(
_
)
=>
{
self
.meter.fill
.store
(
0
,
Ordering
::
SeqCst
);
val
}
other
=>
other
,
}
}
fn
poll_flush
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<
'_
>
)
->
Poll
<
Result
<
(),
Self
::
Error
>>
{
match
Pin
::
new
(
&
mut
self
.inner
)
.poll_ready
(
cx
)
{
val
@
Poll
::
Ready
(
_
)
=>
{
self
.meter.fill
.fetch_add
(
1
,
Ordering
::
SeqCst
);
val
}
other
=>
other
,
}
}
}
node/network/bridge/Cargo.toml
View file @
e3676fdc
...
...
@@ -20,5 +20,6 @@ polkadot-node-network-protocol = { path = "../protocol" }
assert_matches
=
"1.4.0"
parking_lot
=
"0.11.1"
polkadot-node-subsystem-test-helpers
=
{
path
=
"../../subsystem-test-helpers"
}
polkadot-node-subsystem-util
=
{
path
=
"../../subsystem-util"
}
sp-core
=
{
git
=
"https://github.com/paritytech/substrate"
,
branch
=
"master"
}
sp-keyring
=
{
git
=
"https://github.com/paritytech/substrate"
,
branch
=
"master"
}
node/network/bridge/src/lib.rs
View file @
e3676fdc
...
...
@@ -791,7 +791,6 @@ where
#[cfg(test)]
mod
tests
{
use
super
::
*
;
use
futures
::
channel
::
mpsc
;
use
futures
::
executor
;
use
std
::
borrow
::
Cow
;
...
...
@@ -805,6 +804,7 @@ mod tests {
use
polkadot_node_subsystem_test_helpers
::{
SingleItemSink
,
SingleItemStream
,
TestSubsystemContextHandle
,
};
use
polkadot_node_subsystem_util
::
metered
;
use
polkadot_node_network_protocol
::
view
;
use
sc_network
::
Multiaddr
;
use
sp_keyring
::
Sr25519Keyring
;
...
...
@@ -812,7 +812,7 @@ mod tests {
// The subsystem's view of the network - only supports a single call to `event_stream`.
struct
TestNetwork
{
net_events
:
Arc
<
Mutex
<
Option
<
SingleItemStream
<
NetworkEvent
>>>>
,
action_tx
:
m
psc
::
UnboundedSender
<
NetworkAction
>
,
action_tx
:
m
etered
::
Unbounded
Metered
Sender
<
NetworkAction
>
,
}
struct
TestAuthorityDiscovery
;
...
...
@@ -820,7 +820,7 @@ mod tests {
// The test's view of the network. This receives updates from the subsystem in the form
// of `NetworkAction`s.
struct
TestNetworkHandle
{
action_rx
:
m
psc
::
UnboundedReceiver
<
NetworkAction
>
,
action_rx
:
m
etered
::
Unbounded
Metered
Receiver
<
NetworkAction
>
,
<