Unverified Commit eaae8653 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Implement Network Bridge (#1280)



* network bridge skeleton

* move some primitives around and add debug impls

* protocol registration glue & abstract network interface

* add send_msgs to subsystemctx

* select logic

* transform different events into actions and handle

* implement remaining network bridge state machine

* start test skeleton

* make network methods asynchronous

* extract subsystem out to subsystem crate

* port over overseer to subsystem context trait

* fix minimal example

* fix overseer doc test

* update network-bridge crate

* write a subsystem test-helpers crate

* write a network test helper for network-bridge

* set up (broken) view test

* Revamp network to be more async-friendly and not require Sync

* fix spacing

* fix test compilation

* insert side-channel for actions

* Add some more message types to AllMessages

* introduce a test harness

* add some tests

* ensure service compiles and passes tests

* fix typo

* fix service-new compilation

* Subsystem test helpers send messages synchronously

* remove smelly action inspector

* remove superfluous let binding

* fix warnings

* Update node/network/bridge/src/lib.rs
Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* fix compilation
Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
parent 7aa95b19
......@@ -256,6 +256,17 @@ dependencies = [
"webpki-roots 0.19.0",
]
[[package]]
name = "async-trait"
version = "0.1.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a265e3abeffdce30b2e26b7a11b222fe37c6067404001b434101457d0385eb92"
dependencies = [
"proc-macro2 1.0.18",
"quote 1.0.7",
"syn 1.0.31",
]
[[package]]
name = "atty"
version = "0.2.14"
......@@ -4348,6 +4359,25 @@ dependencies = [
"wasm-timer",
]
[[package]]
name = "polkadot-network-bridge"
version = "0.1.0"
dependencies = [
"assert_matches",
"futures 0.3.5",
"futures-timer 3.0.2",
"log 0.4.8",
"parity-scale-codec",
"parking_lot 0.10.2",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sc-network",
"sp-runtime",
"streamunordered",
]
[[package]]
name = "polkadot-network-test"
version = "0.8.13"
......@@ -4370,36 +4400,39 @@ dependencies = [
]
[[package]]
name = "polkadot-node-messages"
name = "polkadot-node-primitives"
version = "0.1.0"
dependencies = [
"futures 0.3.5",
"polkadot-node-primitives",
"async-trait",
"parity-scale-codec",
"polkadot-primitives",
"polkadot-statement-table",
"sc-network",
"sp-runtime",
]
[[package]]
name = "polkadot-node-primitives"
name = "polkadot-node-subsystem"
version = "0.1.0"
dependencies = [
"parity-scale-codec",
"async-trait",
"futures 0.3.5",
"polkadot-node-primitives",
"polkadot-primitives",
"polkadot-statement-table",
"sp-runtime",
"sc-network",
]
[[package]]
name = "polkadot-overseer"
version = "0.1.0"
dependencies = [
"async-trait",
"femme",
"futures 0.3.5",
"futures-timer 3.0.2",
"kv-log-macro",
"log 0.4.8",
"polkadot-node-messages",
"polkadot-node-subsystem",
"polkadot-primitives",
"sc-client-api",
"streamunordered",
......@@ -4711,6 +4744,7 @@ dependencies = [
"parity-scale-codec",
"parking_lot 0.9.0",
"polkadot-network",
"polkadot-node-subsystem",
"polkadot-overseer",
"polkadot-primitives",
"polkadot-rpc",
......@@ -4760,6 +4794,16 @@ dependencies = [
"sp-core",
]
[[package]]
name = "polkadot-subsystem-test-helpers"
version = "0.1.0"
dependencies = [
"async-trait",
"futures 0.3.5",
"parking_lot 0.10.2",
"polkadot-node-subsystem",
]
[[package]]
name = "polkadot-test-runtime"
version = "0.8.13"
......
......@@ -42,10 +42,12 @@ members = [
"service",
"validation",
"node/messages",
"node/network/bridge",
"node/overseer",
"node/primitives",
"node/service",
"node/subsystem",
"node/test-helpers/subsystem",
"parachain/test-parachains",
"parachain/test-parachains/adder",
......
[package]
name = "polkadot-network-bridge"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.5"
log = "0.4.8"
futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../../../primitives" }
node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
parity-scale-codec = "1.3.0"
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
This diff is collapsed.
......@@ -11,7 +11,8 @@ futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../../primitives" }
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
messages = { package = "polkadot-node-messages", path = "../messages" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
async-trait = "0.1"
[dev-dependencies]
futures = { version = "0.3.5", features = ["thread-pool"] }
......
......@@ -28,16 +28,17 @@ use futures_timer::Delay;
use kv_log_macro as log;
use polkadot_primitives::parachain::{BlockData, PoVBlock};
use polkadot_overseer::{Overseer, Subsystem, SubsystemContext, SpawnedSubsystem};
use polkadot_overseer::Overseer;
use messages::{
AllMessages, CandidateBackingMessage, FromOverseer, CandidateValidationMessage
use polkadot_subsystem::{Subsystem, SubsystemContext, SpawnedSubsystem, FromOverseer};
use polkadot_subsystem::messages::{
AllMessages, CandidateBackingMessage, CandidateValidationMessage
};
struct Subsystem1;
impl Subsystem1 {
async fn run(mut ctx: SubsystemContext<CandidateBackingMessage>) {
async fn run(mut ctx: impl SubsystemContext<Message=CandidateBackingMessage>) {
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
......@@ -56,7 +57,7 @@ impl Subsystem1 {
Delay::new(Duration::from_secs(1)).await;
let (tx, _) = oneshot::channel();
ctx.send_msg(AllMessages::CandidateValidation(
ctx.send_message(AllMessages::CandidateValidation(
CandidateValidationMessage::Validate(
Default::default(),
Default::default(),
......@@ -70,8 +71,10 @@ impl Subsystem1 {
}
}
impl Subsystem<CandidateBackingMessage> for Subsystem1 {
fn start(&mut self, ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for Subsystem1
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(&mut self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
......@@ -81,7 +84,7 @@ impl Subsystem<CandidateBackingMessage> for Subsystem1 {
struct Subsystem2;
impl Subsystem2 {
async fn run(mut ctx: SubsystemContext<CandidateValidationMessage>) {
async fn run(mut ctx: impl SubsystemContext<Message=CandidateValidationMessage>) {
ctx.spawn(Box::pin(async {
loop {
log::info!("Job tick");
......@@ -105,8 +108,10 @@ impl Subsystem2 {
}
}
impl Subsystem<CandidateValidationMessage> for Subsystem2 {
fn start(&mut self, ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for Subsystem2
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(&mut self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
......
......@@ -65,8 +65,8 @@ use futures::channel::{mpsc, oneshot};
use futures::{
pending, poll, select,
future::{BoxFuture, RemoteHandle},
stream::FuturesUnordered,
task::{Spawn, SpawnError, SpawnExt},
stream::{self, FuturesUnordered},
task::{Spawn, SpawnExt},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
......@@ -75,50 +75,14 @@ use streamunordered::{StreamYield, StreamUnordered};
use polkadot_primitives::{Block, BlockNumber, Hash};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
pub use messages::{
OverseerSignal, CandidateValidationMessage, CandidateBackingMessage, AllMessages,
FromOverseer,
use polkadot_subsystem::messages::{
CandidateValidationMessage, CandidateBackingMessage, AllMessages
};
pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem,
};
/// An error type that describes faults that may happen
///
/// These are:
/// * Channels being closed
/// * Subsystems dying when they are not expected to
/// * Subsystems not dying when they are told to die
/// * etc.
#[derive(Debug)]
pub struct SubsystemError;
impl From<mpsc::SendError> for SubsystemError {
fn from(_: mpsc::SendError) -> Self {
Self
}
}
impl From<oneshot::Canceled> for SubsystemError {
fn from(_: oneshot::Canceled) -> Self {
Self
}
}
impl From<SpawnError> for SubsystemError {
fn from(_: SpawnError) -> Self {
Self
}
}
/// A `Result` type that wraps [`SubsystemError`].
///
/// [`SubsystemError`]: struct.SubsystemError.html
pub type SubsystemResult<T> = Result<T, SubsystemError>;
/// An asynchronous subsystem task that runs inside and being overseen by the [`Overseer`].
///
/// In essence it's just a newtype wrapping a `BoxFuture`.
///
/// [`Overseer`]: struct.Overseer.html
pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>);
// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
......@@ -278,7 +242,7 @@ impl Debug for ToOverseer {
/// A running instance of some [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance<M: Debug> {
struct SubsystemInstance<M> {
tx: mpsc::Sender<FromOverseer<M>>,
}
......@@ -289,17 +253,17 @@ struct SubsystemInstance<M: Debug> {
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
pub struct SubsystemContext<M: Debug>{
#[derive(Debug)]
pub struct OverseerSubsystemContext<M>{
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::Sender<ToOverseer>,
}
impl<M: Debug> SubsystemContext<M> {
/// Try to asyncronously receive a message.
///
/// This has to be used with caution, if you loop over this without
/// using `pending!()` macro you will end up with a busy loop!
pub async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
#[async_trait::async_trait]
impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
type Message = M;
async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
match poll!(self.rx.next()) {
Poll::Ready(Some(msg)) => Ok(Some(msg)),
Poll::Ready(None) => Err(()),
......@@ -307,13 +271,11 @@ impl<M: Debug> SubsystemContext<M> {
}
}
/// Receive a message.
pub async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
self.rx.next().await.ok_or(SubsystemError)
}
/// Spawn a child task on the executor.
pub async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ToOverseer::SpawnJob {
s,
......@@ -323,33 +285,25 @@ impl<M: Debug> SubsystemContext<M> {
rx.await?
}
/// Send a direct message to some other `Subsystem`, routed based on message type.
pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> {
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(ToOverseer::SubsystemMessage(msg)).await?;
Ok(())
}
fn new(rx: mpsc::Receiver<FromOverseer<M>>, tx: mpsc::Sender<ToOverseer>) -> Self {
Self {
rx,
tx,
}
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
self.tx.send_all(&mut msgs).await?;
Ok(())
}
}
/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
///
/// It is generic over the message type circulating in the system.
/// The idea that we want some type contaning persistent state that
/// can spawn actually running subsystems when asked to.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
pub trait Subsystem<M: Debug> {
/// Start this `Subsystem` and return `SpawnedSubsystem`.
fn start(&mut self, ctx: SubsystemContext<M>) -> SpawnedSubsystem;
}
/// A subsystem compatible with the overseer - one which can be run in the context of the
/// overseer.
pub type CompatibleSubsystem<M> = Box<dyn Subsystem<OverseerSubsystemContext<M>> + Send>;
/// A subsystem that we oversee.
///
......@@ -359,8 +313,8 @@ pub trait Subsystem<M: Debug> {
///
/// [`Subsystem`]: trait.Subsystem.html
#[allow(dead_code)]
struct OverseenSubsystem<M: Debug> {
subsystem: Box<dyn Subsystem<M> + Send>,
struct OverseenSubsystem<M> {
subsystem: CompatibleSubsystem<M>,
instance: Option<SubsystemInstance<M>>,
}
......@@ -441,16 +395,20 @@ where
/// # use std::time::Duration;
/// # use futures::{executor, pin_mut, select, FutureExt};
/// # use futures_timer::Delay;
/// # use polkadot_overseer::{
/// # Overseer, Subsystem, SpawnedSubsystem, SubsystemContext,
/// # CandidateValidationMessage, CandidateBackingMessage,
/// # use polkadot_overseer::Overseer;
/// # use polkadot_subsystem::{
/// # Subsystem, SpawnedSubsystem, SubsystemContext,
/// # messages::{CandidateValidationMessage, CandidateBackingMessage},
/// # };
///
/// struct ValidationSubsystem;
/// impl Subsystem<CandidateValidationMessage> for ValidationSubsystem {
///
/// impl<C> Subsystem<C> for ValidationSubsystem
/// where C: SubsystemContext<Message=CandidateValidationMessage>
/// {
/// fn start(
/// &mut self,
/// mut ctx: SubsystemContext<CandidateValidationMessage>,
/// mut ctx: C,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
......@@ -461,10 +419,12 @@ where
/// }
///
/// struct CandidateBackingSubsystem;
/// impl Subsystem<CandidateBackingMessage> for CandidateBackingSubsystem {
/// impl<C> Subsystem<C> for CandidateBackingSubsystem
/// where C: SubsystemContext<Message=CandidateBackingMessage>
/// {
/// fn start(
/// &mut self,
/// mut ctx: SubsystemContext<CandidateBackingMessage>,
/// mut ctx: C,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
......@@ -498,8 +458,8 @@ where
/// ```
pub fn new(
leaves: impl IntoIterator<Item = BlockInfo>,
validation: Box<dyn Subsystem<CandidateValidationMessage> + Send>,
candidate_backing: Box<dyn Subsystem<CandidateBackingMessage> + Send>,
validation: CompatibleSubsystem<CandidateValidationMessage>,
candidate_backing: CompatibleSubsystem<CandidateBackingMessage>,
mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)> {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
......@@ -680,6 +640,12 @@ where
let _ = s.tx.send(FromOverseer::Communication { msg }).await;
}
}
_ => {
// TODO: temporary catch-all until all subsystems are integrated with overseer.
// The overseer is not complete until this is an exhaustive match with all
// messages targeting an included subsystem.
// https://github.com/paritytech/polkadot/issues/1317
}
}
}
......@@ -688,15 +654,15 @@ where
}
}
fn spawn<S: Spawn, M: Debug>(
fn spawn<S: Spawn, M: Send + 'static>(
spawner: &mut S,
futures: &mut FuturesUnordered<RemoteHandle<()>>,
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
mut s: Box<dyn Subsystem<M> + Send>,
mut s: CompatibleSubsystem<M>,
) -> SubsystemResult<OverseenSubsystem<M>> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
let ctx = SubsystemContext::new(to_rx, from_tx);
let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx };
let f = s.start(ctx);
let handle = spawner.spawn_with_handle(f.0)?;
......@@ -723,8 +689,10 @@ mod tests {
struct TestSubsystem1(mpsc::Sender<usize>);
impl Subsystem<CandidateValidationMessage> for TestSubsystem1 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for TestSubsystem1
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
let mut i = 0;
......@@ -746,14 +714,16 @@ mod tests {
struct TestSubsystem2(mpsc::Sender<usize>);
impl Subsystem<CandidateBackingMessage> for TestSubsystem2 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for TestSubsystem2
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
let mut c: usize = 0;
loop {
if c < 10 {
let (tx, _) = oneshot::channel();
ctx.send_msg(
ctx.send_message(
AllMessages::CandidateValidation(
CandidateValidationMessage::Validate(
Default::default(),
......@@ -786,8 +756,10 @@ mod tests {
struct TestSubsystem4;
impl Subsystem<CandidateBackingMessage> for TestSubsystem4 {
fn start(&mut self, mut _ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for TestSubsystem4
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(&mut self, mut _ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
// Do nothing and exit.
}))
......@@ -871,8 +843,10 @@ mod tests {
struct TestSubsystem5(mpsc::Sender<OverseerSignal>);
impl Subsystem<CandidateValidationMessage> for TestSubsystem5 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for TestSubsystem5
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
......@@ -895,8 +869,10 @@ mod tests {
struct TestSubsystem6(mpsc::Sender<OverseerSignal>);
impl Subsystem<CandidateBackingMessage> for TestSubsystem6 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for TestSubsystem6
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
......
......@@ -10,3 +10,4 @@ polkadot-primitives = { path = "../../primitives" }
polkadot-statement-table = { path = "../../statement-table" }
parity-scale-codec = { version = "1.3.0", default-features = false, features = ["derive"] }
runtime_primitives = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
async-trait = "0.1"
......@@ -64,6 +64,7 @@ impl EncodeAs<CompactStatement> for Statement {
pub type SignedFullStatement = Signed<Statement, CompactStatement>;
/// A misbehaviour report.
#[derive(Debug)]
pub enum MisbehaviorReport {
/// These validator nodes disagree on this candidate's validity, please figure it out
///
......@@ -79,3 +80,12 @@ pub enum MisbehaviorReport {
/// This peer has seconded more than one parachain candidate for this relay parent head
DoubleVote(CandidateReceipt, SignedFullStatement, SignedFullStatement),
}
/// A unique identifier for a network protocol.
pub type ProtocolId = [u8; 4];
/// A succinct representation of a peer's view. This consists of a bounded amount of chain heads.
///
/// Up to `N` (5?) chain heads.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct View(pub Vec<Hash>);
......@@ -15,6 +15,7 @@ hex-literal = "0.2.1"
polkadot-primitives = { path = "../../primitives" }
polkadot-runtime = { path = "../../runtime/polkadot" }
polkadot-overseer = { path = "../overseer" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
kusama-runtime = { path = "../../runtime/kusama" }
westend-runtime = { path = "../../runtime/westend" }
polkadot-network = { path = "../../network", optional = true }
......
......@@ -29,10 +29,10 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use sc_executor::native_executor_instance;
use log::info;
use sp_blockchain::HeaderBackend;
use polkadot_overseer::{
self as overseer,
BlockInfo, Overseer, OverseerHandler, Subsystem, SubsystemContext, SpawnedSubsystem,
CandidateValidationMessage, CandidateBackingMessage,
use polkadot_overseer::{self as overseer, BlockInfo, Overseer, OverseerHandler};
use polkadot_subsystem::{
Subsystem, SubsystemContext, SpawnedSubsystem,
messages::{CandidateValidationMessage, CandidateBackingMessage},
};
pub use service::{
Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis,
......@@ -269,8 +269,10 @@ macro_rules! new_full_start {
struct CandidateValidationSubsystem;
impl Subsystem<CandidateValidationMessage> for CandidateValidationSubsystem {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for CandidateValidationSubsystem
where C: SubsystemContext<Message = CandidateValidationMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
while let Ok(_) = ctx.recv().await {}
}))
......@@ -279,8 +281,10 @@ impl Subsystem<CandidateValidationMessage> for CandidateValidationSubsystem {