Unverified Commit e750a825 authored by Peter Goodspeed-Niklaus's avatar Peter Goodspeed-Niklaus Committed by GitHub
Browse files

implement collation generation subsystem (#1557)

* start sketching out a collation generation subsystem

* invent a basic strategy for double initialization

* clean up warnings

* impl util requests from runtime assuming a context instead of a FromJob sender

* implement collation generation algorithm from guide

* update AllMessages in tests

* fix trivial review comments

* remove another redundant declaration from merge

* filter availability cores by para_id

* handle new activations each in their own async task

* update guide according to the actual current implementation

* add initialization to guide

* add general-purpose subsystem_test_harness helper

* write first handle_new_activations test

* add test that handle_new_activations filters local_validation_data requests

* add (failing) test of collation distribution message sending

* rustfmt

* broken: work on fixing sender test

Unfortunately, for reasons that are not yet clear, despite the public key
and checked data being identical, the signer is not producing an identical
signature. This commit produces this output (among more):

signing with  Public(c4733ab0bbe3ba4c096685d1737a7f498cdbdd167a767d04a21dc7df12b8c858 (5GWHUNm5...))
checking with Public(c4733ab0bbe3ba4c096685d1737a7f498cdbdd167a767d04a21dc7df12b8c858 (5GWHUNm5...))
signed payload:  [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 10, 0, 0, 0, c7, e5, c0, 64, 7a, db, fe, 44, 81, e5, 51, 11, 79, 9f, a5, 63, 93, 94, 3c, c4, 36, c6, 30, 36, c2, c5, 44, a2, 1b, db, b7, 82, 3, 17, a, 2e, 75, 97, b7, b7, e3, d8, 4c, 5, 39, 1d, 13, 9a, 62, b1, 57, e7, 87, 86, d8, c0, 82, f2, 9d, cf, 4c, 11, 13, 14]
checked payload: [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 10, 0, 0, 0, c7, e5, c0, 64, 7a, db, fe, 44, 81, e5, 51, 11, 79, 9f, a5, 63, 93, 94, 3c, c4, 36, c6, 30, 36, c2, c5, 44, a2, 1b, db, b7, 82, 3, 17, a, 2e, 75, 97, b7, b7, e3, d8, 4c, 5, 39, 1d, 13, 9a, 62, b1, 57, e7, 87, 86, d8, c0, 82, f2, 9d, cf, 4c, 11, 13, 14]

* fix broken test

* collation function returns commitments hash

It doesn't look like we use the actual commitments data anywhere, and
it's not obvious if there are any fields of `CandidateCommitments`
not available to the collator, so this commit just assigns them the
entire responsibility of generating the hash.

* add missing overseer impls

* calculating erasure coding is polkadot's responsibility, not cumulus

* concurrentize per-relay_parent requests
parent 474b72a5
Pipeline #104035 canceled with stages
in 5 minutes and 25 seconds
......@@ -4653,6 +4653,22 @@ dependencies = [
"sp-runtime",
]
[[package]]
name = "polkadot-node-collation-generation"
version = "0.1.0"
dependencies = [
"derive_more 0.99.9",
"futures 0.3.5",
"log 0.4.11",
"polkadot-erasure-coding",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-core",
]
[[package]]
name = "polkadot-node-core-av-store"
version = "0.1.0"
......@@ -4816,6 +4832,7 @@ dependencies = [
name = "polkadot-node-primitives"
version = "0.1.0"
dependencies = [
"futures 0.3.5",
"parity-scale-codec",
"polkadot-primitives",
"polkadot-statement-table",
......
......@@ -44,6 +44,7 @@ members = [
"service",
"validation",
"node/collation-generation",
"node/core/av-store",
"node/core/backing",
"node/core/bitfield-signing",
......
[package]
name = "polkadot-node-collation-generation"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
derive_more = "0.99.9"
futures = "0.3.5"
log = "0.4.8"
polkadot-erasure-coding = { path = "../../erasure-coding" }
polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem = { path = "../subsystem" }
polkadot-node-subsystem-util = { path = "../subsystem-util" }
polkadot-primitives = { path = "../../primitives" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
[dev-dependencies]
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
This diff is collapsed.
......@@ -145,7 +145,6 @@ fn main() {
candidate_validation: Subsystem2,
candidate_backing: Subsystem1,
candidate_selection: DummySubsystem,
collator_protocol: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_signing: DummySubsystem,
......@@ -156,6 +155,8 @@ fn main() {
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
chain_api: DummySubsystem,
collation_generation: DummySubsystem,
collator_protocol: DummySubsystem,
};
let (overseer, _handler) = Overseer::new(
vec![],
......
......@@ -78,8 +78,8 @@ use polkadot_subsystem::messages::{
CandidateValidationMessage, CandidateBackingMessage,
CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage,
AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage,
ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, CollatorProtocolMessage,
AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages,
ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage,
AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, CollationGenerationMessage, CollatorProtocolMessage,
};
pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
......@@ -352,9 +352,6 @@ pub struct Overseer<S: SpawnNamed> {
/// A candidate selection subsystem.
candidate_selection_subsystem: OverseenSubsystem<CandidateSelectionMessage>,
/// A collator protocol subsystem
collator_protocol_subsystem: OverseenSubsystem<CollatorProtocolMessage>,
/// A statement distribution subsystem.
statement_distribution_subsystem: OverseenSubsystem<StatementDistributionMessage>,
......@@ -382,9 +379,15 @@ pub struct Overseer<S: SpawnNamed> {
/// A network bridge subsystem.
network_bridge_subsystem: OverseenSubsystem<NetworkBridgeMessage>,
/// A Chain API subsystem
/// A Chain API subsystem.
chain_api_subsystem: OverseenSubsystem<ChainApiMessage>,
/// A Collation Generation subsystem.
collation_generation_subsystem: OverseenSubsystem<CollationGenerationMessage>,
/// A Collator Protocol subsystem.
collator_protocol_subsystem: OverseenSubsystem<CollatorProtocolMessage>,
/// Spawner to spawn tasks to.
s: S,
......@@ -417,15 +420,13 @@ pub struct Overseer<S: SpawnNamed> {
///
/// [`Subsystem`]: trait.Subsystem.html
/// [`DummySubsystem`]: struct.DummySubsystem.html
pub struct AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA> {
pub struct AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP> {
/// A candidate validation subsystem.
pub candidate_validation: CV,
/// A candidate backing subsystem.
pub candidate_backing: CB,
/// A candidate selection subsystem.
pub candidate_selection: CS,
/// A collator protocol subsystem.
pub collator_protocol: CP,
/// A statement distribution subsystem.
pub statement_distribution: SD,
/// An availability distribution subsystem.
......@@ -446,6 +447,10 @@ pub struct AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA
pub network_bridge: NB,
/// A Chain API subsystem.
pub chain_api: CA,
/// A Collation Generation subsystem.
pub collation_generation: CG,
/// A Collator Protocol subsystem.
pub collator_protocol: CP,
}
impl<S> Overseer<S>
......@@ -518,7 +523,6 @@ where
/// candidate_validation: ValidationSubsystem,
/// candidate_backing: DummySubsystem,
/// candidate_selection: DummySubsystem,
/// collator_protocol: DummySubsystem,
/// statement_distribution: DummySubsystem,
/// availability_distribution: DummySubsystem,
/// bitfield_signing: DummySubsystem,
......@@ -529,6 +533,8 @@ where
/// availability_store: DummySubsystem,
/// network_bridge: DummySubsystem,
/// chain_api: DummySubsystem,
/// collation_generation: DummySubsystem,
/// collator_protocol: DummySubsystem,
/// };
/// let (overseer, _handler) = Overseer::new(
/// vec![],
......@@ -549,16 +555,15 @@ where
/// #
/// # }); }
/// ```
pub fn new<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>(
pub fn new<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>(
leaves: impl IntoIterator<Item = BlockInfo>,
all_subsystems: AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>,
all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA, CG, CP>,
mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)>
where
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send,
CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send,
CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>> + Send,
SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send,
AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send,
BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>> + Send,
......@@ -569,6 +574,8 @@ where
AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>> + Send,
NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>> + Send,
CA: Subsystem<OverseerSubsystemContext<ChainApiMessage>> + Send,
CG: Subsystem<OverseerSubsystemContext<CollationGenerationMessage>> + Send,
CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>> + Send,
{
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
......@@ -600,13 +607,6 @@ where
all_subsystems.candidate_selection,
)?;
let collator_protocol_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.collator_protocol,
)?;
let statement_distribution_subsystem = spawn(
&mut s,
&mut running_subsystems,
......@@ -677,6 +677,21 @@ where
all_subsystems.chain_api,
)?;
let collation_generation_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.collation_generation,
)?;
let collator_protocol_subsystem = spawn(
&mut s,
&mut running_subsystems,
&mut running_subsystems_rx,
all_subsystems.collator_protocol,
)?;
let active_leaves = HashSet::new();
let leaves = leaves
......@@ -688,7 +703,6 @@ where
candidate_validation_subsystem,
candidate_backing_subsystem,
candidate_selection_subsystem,
collator_protocol_subsystem,
statement_distribution_subsystem,
availability_distribution_subsystem,
bitfield_signing_subsystem,
......@@ -699,6 +713,8 @@ where
availability_store_subsystem,
network_bridge_subsystem,
chain_api_subsystem,
collation_generation_subsystem,
collator_protocol_subsystem,
s,
running_subsystems,
running_subsystems_rx,
......@@ -724,10 +740,6 @@ where
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
......@@ -768,6 +780,14 @@ where
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
if let Some(ref mut s) = self.collation_generation_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse();
loop {
......@@ -889,10 +909,6 @@ where
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
......@@ -930,7 +946,15 @@ where
}
if let Some(ref mut s) = self.chain_api_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal)).await?;
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
if let Some(ref mut s) = self.collation_generation_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
Ok(())
......@@ -953,11 +977,6 @@ where
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::CollatorProtocol(msg) => {
if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::StatementDistribution(msg) => {
if let Some(ref mut s) = self.statement_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
......@@ -1008,6 +1027,16 @@ where
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::CollationGeneration(msg) => {
if let Some(ref mut s) = self.collation_generation_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
AllMessages::CollatorProtocol(msg) => {
if let Some(ref mut s) = self.collator_protocol_subsystem.instance {
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
}
}
......@@ -1169,7 +1198,6 @@ mod tests {
candidate_validation: TestSubsystem1(s1_tx),
candidate_backing: TestSubsystem2(s2_tx),
candidate_selection: DummySubsystem,
collator_protocol: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_signing: DummySubsystem,
......@@ -1180,6 +1208,8 @@ mod tests {
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
chain_api: DummySubsystem,
collation_generation: DummySubsystem,
collator_protocol: DummySubsystem,
};
let (overseer, mut handler) = Overseer::new(
vec![],
......@@ -1234,7 +1264,6 @@ mod tests {
candidate_validation: TestSubsystem1(s1_tx),
candidate_backing: TestSubsystem4,
candidate_selection: DummySubsystem,
collator_protocol: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_signing: DummySubsystem,
......@@ -1245,6 +1274,8 @@ mod tests {
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
chain_api: DummySubsystem,
collation_generation: DummySubsystem,
collator_protocol: DummySubsystem,
};
let (overseer, _handle) = Overseer::new(
vec![],
......@@ -1352,7 +1383,6 @@ mod tests {
candidate_validation: TestSubsystem5(tx_5),
candidate_backing: TestSubsystem6(tx_6),
candidate_selection: DummySubsystem,
collator_protocol: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_signing: DummySubsystem,
......@@ -1363,6 +1393,8 @@ mod tests {
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
chain_api: DummySubsystem,
collation_generation: DummySubsystem,
collator_protocol: DummySubsystem,
};
let (overseer, mut handler) = Overseer::new(
vec![first_block],
......@@ -1455,7 +1487,6 @@ mod tests {
candidate_validation: TestSubsystem5(tx_5),
candidate_backing: TestSubsystem6(tx_6),
candidate_selection: DummySubsystem,
collator_protocol: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_signing: DummySubsystem,
......@@ -1466,6 +1497,8 @@ mod tests {
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
chain_api: DummySubsystem,
collation_generation: DummySubsystem,
collator_protocol: DummySubsystem,
};
// start with two forks of different height.
let (overseer, mut handler) = Overseer::new(
......
......@@ -6,6 +6,7 @@ edition = "2018"
description = "Primitives types for the Node-side"
[dependencies]
futures = "0.3.5"
polkadot-primitives = { path = "../../primitives" }
polkadot-statement-table = { path = "../../statement-table" }
parity-scale-codec = { version = "1.3.4", default-features = false, features = ["derive"] }
......
......@@ -20,12 +20,13 @@
//! not shared between the node and the runtime. This crate builds on top of the primitives defined
//! there.
use futures::Future;
use parity_scale_codec::{Decode, Encode};
use polkadot_primitives::v1::{
Hash, CommittedCandidateReceipt, CandidateReceipt, CompactStatement,
EncodeAs, Signed, SigningContext, ValidatorIndex, ValidatorId,
UpwardMessage, Balance, ValidationCode, GlobalValidationData, LocalValidationData,
HeadData,
HeadData, PoV, CollatorPair, Id as ParaId,
};
use polkadot_statement_table::{
generic::{
......@@ -258,3 +259,39 @@ impl std::convert::TryFrom<FromTableMisbehavior> for MisbehaviorReport {
}
}
}
/// The output of a collator.
///
/// This differs from `CandidateCommitments` in two ways:
///
/// - does not contain the erasure root; that's computed at the Polkadot level, not at Cumulus
/// - contains a proof of validity.
#[derive(Clone, Encode, Decode)]
pub struct Collation {
/// Fees paid from the chain to the relay chain validators.
pub fees: Balance,
/// Messages destined to be interpreted by the Relay chain itself.
pub upward_messages: Vec<UpwardMessage>,
/// New validation code.
pub new_validation_code: Option<ValidationCode>,
/// The head-data produced as a result of execution.
pub head_data: HeadData,
/// Proof that this block is valid.
pub proof_of_validity: PoV,
}
/// Configuration for the collation generator
pub struct CollationGenerationConfig {
/// Collator's authentication key, so it can sign things.
pub key: CollatorPair,
/// Collation function.
pub collator: Box<dyn Fn(&GlobalValidationData, &LocalValidationData) -> Box<dyn Future<Output = Collation> + Unpin + Send> + Send + Sync>,
/// The parachain that this collator collates for
pub para_id: ParaId,
}
impl std::fmt::Debug for CollationGenerationConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CollationGenerationConfig {{ ... }}")
}
}
......@@ -281,7 +281,6 @@ fn real_overseer<S: SpawnNamed>(
candidate_validation: DummySubsystem,
candidate_backing: DummySubsystem,
candidate_selection: DummySubsystem,
collator_protocol: DummySubsystem,
statement_distribution: DummySubsystem,
availability_distribution: DummySubsystem,
bitfield_signing: DummySubsystem,
......@@ -292,6 +291,8 @@ fn real_overseer<S: SpawnNamed>(
availability_store: DummySubsystem,
network_bridge: DummySubsystem,
chain_api: DummySubsystem,
collation_generation: DummySubsystem,
collator_protocol: DummySubsystem,
};
Overseer::new(
leaves,
......
......@@ -16,19 +16,21 @@
//! Utilities for testing subsystems.
use polkadot_node_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError};
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem::{FromOverseer, SubsystemContext, SubsystemError, SubsystemResult};
use futures::prelude::*;
use futures::channel::mpsc;
use futures::poll;
use futures::prelude::*;
use futures_timer::Delay;
use parking_lot::Mutex;
use sp_core::traits::SpawnNamed;
use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
use std::convert::Infallible;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::Duration;
enum SinkState<T> {
Empty {
......@@ -50,24 +52,21 @@ pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);
impl<T> Sink<T> for SingleItemSink<T> {
type Error = Infallible;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Infallible>> {
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
let mut state = self.0.lock();
match *state {
SinkState::Empty { .. } => Poll::Ready(Ok(())),
SinkState::Item { ref mut ready_waker, .. } => {
SinkState::Item {
ref mut ready_waker,
..
} => {
*ready_waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
fn start_send(
self: Pin<&mut Self>,
item: T,
) -> Result<(), Infallible> {
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
let mut state = self.0.lock();
match *state {
......@@ -88,24 +87,21 @@ impl<T> Sink<T> for SingleItemSink<T> {
Ok(())
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Infallible>> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
let mut state = self.0.lock();
match *state {
SinkState::Empty { .. } => Poll::Ready(Ok(())),
SinkState::Item { ref mut flush_waker, .. } => {
SinkState::Item {
ref mut flush_waker,
..
} => {
*flush_waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Infallible>> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
self.poll_flush(cx)
}
}
......@@ -120,7 +116,11 @@ impl<T> Stream for SingleItemStream<T> {
match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) {
SinkState::Empty { .. } => Poll::Pending,
SinkState::Item { item, ready_waker, flush_waker } => {
SinkState::Item {
item,
ready_waker,
flush_waker,
} => {
if let Some(waker) = ready_waker {
waker.wake();
}
......@@ -141,10 +141,7 @@ impl<T> Stream for SingleItemStream<T> {
/// not when the item is buffered.
pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None }));
(
SingleItemSink(inner.clone()),
SingleItemStream(inner),
)
(SingleItemSink(inner.clone()), SingleItemStream(inner))
}
/// A test subsystem context.
......@@ -155,7 +152,9 @@ pub struct TestSubsystemContext<M, S> {
}
#[async_trait::async_trait]
impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for TestSubsystemContext<M, S> {
impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
for TestSubsystemContext<M, S>
{
type Message = M;
async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
......@@ -170,9 +169,11 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for Tes
self.rx.next().await.ok_or(SubsystemError)
}
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-> SubsystemResult<()>
{
async fn spawn(
&mut self,
name: &'static str,
s: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> SubsystemResult<()> {
self.spawn.spawn(name, s);
Ok(())
}
......@@ -185,15 +186,23 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext for Tes
}
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(msg).await.expect("test overseer no longer live");
self.tx
.send(msg)
.await
.expect("test overseer no longer live");
Ok(())
}
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
where