Unverified Commit 322ccd0d authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Overseer: subsystems communicate directly (#2227)



* overseer: pass messages directly between subsystems

* test that message is held on to

* Update node/overseer/src/lib.rs

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* give every subsystem an unbounded sender too

* remove metered_channel::name

1. we don't provide good names
2. these names are never used anywhere

* unused mut

* remove unnecessary &mut

* subsystem unbounded_send

* remove unused MaybeTimer

We have channel size metrics that serve the same purpose better now and the implementation of message timing was pretty ugly.

* remove comment

* split up senders and receivers

* update metrics

* fix tests

* fix test subsystem context

* fix flaky test

* fix docs

* doc

* use select_biased to favor signals

* Update node/subsystem/src/lib.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 39124b0a
Pipeline #131095 canceled with stages
in 6 minutes and 38 seconds
...@@ -4236,12 +4236,6 @@ dependencies = [ ...@@ -4236,12 +4236,6 @@ dependencies = [
"parking_lot 0.11.1", "parking_lot 0.11.1",
] ]
[[package]]
name = "oorandom"
version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]] [[package]]
name = "opaque-debug" name = "opaque-debug"
version = "0.2.3" version = "0.2.3"
...@@ -5886,12 +5880,12 @@ dependencies = [ ...@@ -5886,12 +5880,12 @@ dependencies = [
name = "polkadot-overseer" name = "polkadot-overseer"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"assert_matches",
"async-trait", "async-trait",
"femme", "femme",
"futures 0.3.13", "futures 0.3.13",
"futures-timer 3.0.2", "futures-timer 3.0.2",
"kv-log-macro", "kv-log-macro",
"oorandom",
"polkadot-node-network-protocol", "polkadot-node-network-protocol",
"polkadot-node-primitives", "polkadot-node-primitives",
"polkadot-node-subsystem", "polkadot-node-subsystem",
......
...@@ -25,10 +25,9 @@ use super::Meter; ...@@ -25,10 +25,9 @@ use super::Meter;
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. /// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub fn channel<T>(capacity: usize, name: &'static str) -> (MeteredSender<T>, MeteredReceiver<T>) { pub fn channel<T>(capacity: usize) -> (MeteredSender<T>, MeteredReceiver<T>) {
let (tx, rx) = mpsc::channel(capacity); let (tx, rx) = mpsc::channel(capacity);
let mut shared_meter = Meter::default(); let shared_meter = Meter::default();
shared_meter.name = name;
let tx = MeteredSender { meter: shared_meter.clone(), inner: tx }; let tx = MeteredSender { meter: shared_meter.clone(), inner: tx };
let rx = MeteredReceiver { meter: shared_meter, inner: rx }; let rx = MeteredReceiver { meter: shared_meter, inner: rx };
(tx, rx) (tx, rx)
......
...@@ -30,8 +30,6 @@ pub use self::unbounded::*; ...@@ -30,8 +30,6 @@ pub use self::unbounded::*;
/// A peek into the inner state of a meter. /// A peek into the inner state of a meter.
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub struct Meter { pub struct Meter {
/// Name of the receiver and sender pair.
name: &'static str,
// Number of sends on this channel. // Number of sends on this channel.
sent: Arc<AtomicUsize>, sent: Arc<AtomicUsize>,
// Number of receives on this channel. // Number of receives on this channel.
...@@ -60,11 +58,6 @@ impl Meter { ...@@ -60,11 +58,6 @@ impl Meter {
} }
} }
/// Obtain the name of the channel `Sender` and `Receiver` pair.
pub fn name(&self) -> &'static str {
self.name
}
fn note_sent(&self) { fn note_sent(&self) {
self.sent.fetch_add(1, Ordering::Relaxed); self.sent.fetch_add(1, Ordering::Relaxed);
} }
...@@ -92,7 +85,7 @@ mod tests { ...@@ -92,7 +85,7 @@ mod tests {
#[test] #[test]
fn try_send_try_next() { fn try_send_try_next() {
block_on(async move { block_on(async move {
let (mut tx, mut rx) = channel::<Msg>(5, "goofy"); let (mut tx, mut rx) = channel::<Msg>(5);
let msg = Msg::default(); let msg = Msg::default();
assert_eq!(rx.meter().read(), Readout { sent: 0, received: 0 }); assert_eq!(rx.meter().read(), Readout { sent: 0, received: 0 });
tx.try_send(msg).unwrap(); tx.try_send(msg).unwrap();
...@@ -116,7 +109,7 @@ mod tests { ...@@ -116,7 +109,7 @@ mod tests {
fn with_tasks() { fn with_tasks() {
let (ready, go) = futures::channel::oneshot::channel(); let (ready, go) = futures::channel::oneshot::channel();
let (mut tx, mut rx) = channel::<Msg>(5, "goofy"); let (mut tx, mut rx) = channel::<Msg>(5);
block_on(async move { block_on(async move {
futures::join!( futures::join!(
async move { async move {
...@@ -149,7 +142,7 @@ mod tests { ...@@ -149,7 +142,7 @@ mod tests {
#[test] #[test]
fn stream_and_sink() { fn stream_and_sink() {
let (mut tx, mut rx) = channel::<Msg>(5, "goofy"); let (mut tx, mut rx) = channel::<Msg>(5);
block_on(async move { block_on(async move {
futures::join!( futures::join!(
...@@ -175,8 +168,8 @@ mod tests { ...@@ -175,8 +168,8 @@ mod tests {
#[test] #[test]
fn failed_send_does_not_inc_sent() { fn failed_send_does_not_inc_sent() {
let (mut bounded, _) = channel::<Msg>(5, "pluto"); let (mut bounded, _) = channel::<Msg>(5);
let (mut unbounded, _) = unbounded::<Msg>("pluto"); let (mut unbounded, _) = unbounded::<Msg>();
block_on(async move { block_on(async move {
assert!(bounded.send(Msg::default()).await.is_err()); assert!(bounded.send(Msg::default()).await.is_err());
......
...@@ -25,10 +25,9 @@ use super::Meter; ...@@ -25,10 +25,9 @@ use super::Meter;
/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. /// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
pub fn unbounded<T>(name: &'static str) -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) { pub fn unbounded<T>() -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
let mut shared_meter = Meter::default(); let shared_meter = Meter::default();
shared_meter.name = name;
let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx }; let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx };
let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx }; let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx };
(tx, rx) (tx, rx)
...@@ -147,7 +146,7 @@ impl<T> UnboundedMeteredSender<T> { ...@@ -147,7 +146,7 @@ impl<T> UnboundedMeteredSender<T> {
/// Attempt to send message or fail immediately. /// Attempt to send message or fail immediately.
pub fn unbounded_send(&mut self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> { pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
self.meter.note_sent(); self.meter.note_sent();
self.inner.unbounded_send(msg).map_err(|e| { self.inner.unbounded_send(msg).map_err(|e| {
self.meter.retract_sent(); self.meter.retract_sent();
......
...@@ -734,7 +734,7 @@ mod tests { ...@@ -734,7 +734,7 @@ mod tests {
TestAuthorityDiscovery, TestAuthorityDiscovery,
) { ) {
let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink(); let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink();
let (action_tx, action_rx) = metered::unbounded("test_action"); let (action_tx, action_rx) = metered::unbounded();
( (
TestNetwork { TestNetwork {
......
...@@ -9,7 +9,6 @@ async-trait = "0.1.42" ...@@ -9,7 +9,6 @@ async-trait = "0.1.42"
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
futures = "0.3.12" futures = "0.3.12"
futures-timer = "3.0.2" futures-timer = "3.0.2"
oorandom = "11.1.3"
polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" }
polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-node-subsystem-util = { path = "../subsystem-util" }
polkadot-primitives = { path = "../../primitives" } polkadot-primitives = { path = "../../primitives" }
...@@ -20,6 +19,6 @@ tracing = "0.1.25" ...@@ -20,6 +19,6 @@ tracing = "0.1.25"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-node-network-protocol = { path = "../network/protocol" }
futures = { version = "0.3.12", features = ["thread-pool"] } futures = { version = "0.3.12", features = ["thread-pool"] }
futures-timer = "3.0.2"
femme = "2.1.1" femme = "2.1.1"
kv-log-macro = "1.0.7" kv-log-macro = "1.0.7"
assert_matches = "1.4.0"
This diff is collapsed.
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
use polkadot_node_subsystem::messages::AllMessages; use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
FromOverseer, SubsystemContext, SubsystemError, SubsystemResult, Subsystem, FromOverseer, SubsystemContext, SubsystemError, SubsystemResult, Subsystem,
SpawnedSubsystem, OverseerSignal, SpawnedSubsystem, OverseerSignal, SubsystemSender,
}; };
use polkadot_node_subsystem_util::TimeoutExt; use polkadot_node_subsystem_util::TimeoutExt;
...@@ -156,9 +156,41 @@ pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) { ...@@ -156,9 +156,41 @@ pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
(SingleItemSink(inner.clone()), SingleItemStream(inner)) (SingleItemSink(inner.clone()), SingleItemStream(inner))
} }
/// A test subsystem sender.
#[derive(Clone)]
pub struct TestSubsystemSender {
tx: mpsc::UnboundedSender<AllMessages>,
}
#[async_trait::async_trait]
impl SubsystemSender for TestSubsystemSender {
async fn send_message(&mut self, msg: AllMessages) {
self.tx
.send(msg)
.await
.expect("test overseer no longer live");
}
async fn send_messages<T>(&mut self, msgs: T)
where
T: IntoIterator<Item = AllMessages> + Send,
T::IntoIter: Send,
{
let mut iter = stream::iter(msgs.into_iter().map(Ok));
self.tx
.send_all(&mut iter)
.await
.expect("test overseer no longer live");
}
fn send_unbounded_message(&mut self, msg: AllMessages) {
self.tx.unbounded_send(msg).expect("test overseer no longer live");
}
}
/// A test subsystem context. /// A test subsystem context.
pub struct TestSubsystemContext<M, S> { pub struct TestSubsystemContext<M, S> {
tx: mpsc::UnboundedSender<AllMessages>, tx: TestSubsystemSender,
rx: SingleItemStream<FromOverseer<M>>, rx: SingleItemStream<FromOverseer<M>>,
spawn: S, spawn: S,
} }
...@@ -168,6 +200,7 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext ...@@ -168,6 +200,7 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
for TestSubsystemContext<M, S> for TestSubsystemContext<M, S>
{ {
type Message = M; type Message = M;
type Sender = TestSubsystemSender;
async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> { async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
match poll!(self.rx.next()) { match poll!(self.rx.next()) {
...@@ -198,23 +231,8 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext ...@@ -198,23 +231,8 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
Ok(()) Ok(())
} }
async fn send_message(&mut self, msg: AllMessages) { fn sender(&mut self) -> &mut TestSubsystemSender {
self.tx &mut self.tx
.send(msg)
.await
.expect("test overseer no longer live");
}
async fn send_messages<T>(&mut self, msgs: T)
where
T: IntoIterator<Item = AllMessages> + Send,
T::IntoIter: Send,
{
let mut iter = stream::iter(msgs.into_iter().map(Ok));
self.tx
.send_all(&mut iter)
.await
.expect("test overseer no longer live");
} }
} }
...@@ -260,7 +278,7 @@ pub fn make_subsystem_context<M, S>( ...@@ -260,7 +278,7 @@ pub fn make_subsystem_context<M, S>(
( (
TestSubsystemContext { TestSubsystemContext {
tx: all_messages_tx, tx: TestSubsystemSender { tx: all_messages_tx },
rx: overseer_rx, rx: overseer_rx,
spawn, spawn,
}, },
......
...@@ -210,6 +210,27 @@ pub struct SpawnedSubsystem { ...@@ -210,6 +210,27 @@ pub struct SpawnedSubsystem {
/// [`SubsystemError`]: struct.SubsystemError.html /// [`SubsystemError`]: struct.SubsystemError.html
pub type SubsystemResult<T> = Result<T, SubsystemError>; pub type SubsystemResult<T> = Result<T, SubsystemError>;
/// A sender used by subsystems to communicate with other subsystems.
///
/// Each clone of this type may add more capacity to the bounded buffer, so clones should
/// be used sparingly.
#[async_trait]
pub trait SubsystemSender: Send + Clone + 'static {
/// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: AllMessages);
/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
async fn send_messages<T>(&mut self, msgs: T)
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send;
/// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message
/// type.
///
/// This function should be used only when there is some other bounding factor on the messages
/// sent with it. Otherwise, it risks a memory leak.
fn send_unbounded_message(&mut self, msg: AllMessages);
}
/// A context type that is given to the [`Subsystem`] upon spawning. /// A context type that is given to the [`Subsystem`] upon spawning.
/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s /// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
/// or spawn jobs. /// or spawn jobs.
...@@ -217,11 +238,14 @@ pub type SubsystemResult<T> = Result<T, SubsystemError>; ...@@ -217,11 +238,14 @@ pub type SubsystemResult<T> = Result<T, SubsystemError>;
/// [`Overseer`]: struct.Overseer.html /// [`Overseer`]: struct.Overseer.html
/// [`SubsystemJob`]: trait.SubsystemJob.html /// [`SubsystemJob`]: trait.SubsystemJob.html
#[async_trait] #[async_trait]
pub trait SubsystemContext: Send + 'static { pub trait SubsystemContext: Send + Sized + 'static {
/// The message type of this context. Subsystems launched with this context will expect /// The message type of this context. Subsystems launched with this context will expect
/// to receive messages of this type. /// to receive messages of this type.
type Message: Send; type Message: Send;
/// The message sender type of this context. Clones of the sender should be used sparingly.
type Sender: SubsystemSender;
/// Try to asynchronously receive a message. /// Try to asynchronously receive a message.
/// ///
/// This has to be used with caution, if you loop over this without /// This has to be used with caution, if you loop over this without
...@@ -241,12 +265,34 @@ pub trait SubsystemContext: Send + 'static { ...@@ -241,12 +265,34 @@ pub trait SubsystemContext: Send + 'static {
s: Pin<Box<dyn Future<Output = ()> + Send>>, s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()>; ) -> SubsystemResult<()>;
/// Get a mutable reference to the sender.
fn sender(&mut self) -> &mut Self::Sender;
/// Send a direct message to some other `Subsystem`, routed based on message type. /// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: AllMessages); async fn send_message(&mut self, msg: AllMessages) {
self.sender().send_message(msg).await
}
/// Send multiple direct messages to other `Subsystem`s, routed based on message type. /// Send multiple direct messages to other `Subsystem`s, routed based on message type.
async fn send_messages<T>(&mut self, msgs: T) async fn send_messages<T>(&mut self, msgs: T)
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send; where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
self.sender().send_messages(msgs).await
}
/// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message
/// type.
///
/// This function should be used only when there is some other bounding factor on the messages
/// sent with it. Otherwise, it risks a memory leak.
///
/// Generally, for this method to be used, these conditions should be met:
/// * There is a communication cycle between subsystems
/// * One of the parts of the cycle has a clear bound on the number of messages produced.
fn send_unbounded_message(&mut self, msg: AllMessages) {
self.sender().send_unbounded_message(msg)
}
} }
/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`]. /// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment