Unverified Commit 79a0df2f authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Add some message types from subsystem definitions (#1265)



* introduce polkadot-node-primitives

* guide: change statement distribution message types

* guide: remove variant from `CandidateSelectionMessage`

* add a few more message types

* add TODOs

* Almost all messages

* NewBackedCandidate notification

* Formatting

* Use AttestedCandidate as BackedCandidate

* Update node/primitives/src/lib.rs
Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Fix the tests

* Bring in types from #1242

* Adds network bridge messages

* More message types from doc

* use fn pointer type

* Fixes from the review

* Add missing Runtime subsystem message

* rename to CandidateValidationMessage and fix tests
Co-authored-by: Fedor Sakharov's avatarFedor Sakharov <fedor.sakharov@gmail.com>
Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
parent 9203072f
Pipeline #97277 passed with stages
in 22 minutes and 20 seconds
......@@ -4277,7 +4277,21 @@ dependencies = [
name = "polkadot-node-messages"
version = "0.1.0"
dependencies = [
"futures 0.3.5",
"polkadot-node-primitives",
"polkadot-primitives",
"polkadot-statement-table",
"sc-network",
]
[[package]]
name = "polkadot-node-primitives"
version = "0.1.0"
dependencies = [
"parity-scale-codec",
"polkadot-primitives",
"polkadot-statement-table",
"sp-runtime",
]
[[package]]
......
......@@ -44,6 +44,7 @@ members = [
"node/messages",
"node/overseer",
"node/primitives",
"node/service",
"parachain/test-parachains",
......
......@@ -7,3 +7,7 @@ description = "Message types used by Subsystems"
[dependencies]
polkadot-primitives = { path = "../../primitives" }
polkadot-statement-table = { path = "../../statement-table" }
polkadot-node-primitives = { path = "../primitives" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = "0.3.5"
......@@ -22,7 +22,17 @@
//!
//! Subsystems' APIs are defined separately from their implementation, leading to easier mocking.
use polkadot_primitives::Hash;
use futures::channel::{mpsc, oneshot};
use sc_network::{ObservedRole, ReputationChange, PeerId, config::ProtocolId};
use polkadot_primitives::{BlockNumber, Hash, Signature};
use polkadot_primitives::parachain::{
AbridgedCandidateReceipt, PoVBlock, ErasureChunk, BackedCandidate, Id as ParaId,
SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex,
};
use polkadot_node_primitives::{
MisbehaviorReport, SignedStatement,
};
/// Signals sent by an overseer to a subsystem.
#[derive(PartialEq, Clone, Debug)]
......@@ -35,24 +45,174 @@ pub enum OverseerSignal {
Conclude,
}
/// A message type used by the Validation Subsystem.
/// A notification of a new backed candidate.
#[derive(Debug)]
pub struct NewBackedCandidate(pub BackedCandidate);
/// Messages received by the Candidate Selection subsystem.
#[derive(Debug)]
pub enum CandidateSelectionMessage {
/// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator.
/// The hash is the relay parent.
Invalid(Hash, AbridgedCandidateReceipt),
}
/// Messages received by the Candidate Backing subsystem.
#[derive(Debug)]
pub enum ValidationSubsystemMessage {
ValidityAttestation,
pub enum CandidateBackingMessage {
/// Registers a stream listener for updates to the set of backable candidates that could be backed
/// in a child of the given relay-parent, referenced by its hash.
RegisterBackingWatcher(Hash, mpsc::Sender<NewBackedCandidate>),
/// Note that the Candidate Backing subsystem should second the given candidate in the context of the
/// given relay-parent (ref. by hash). This candidate must be validated.
Second(Hash, AbridgedCandidateReceipt),
/// Note a validator's statement about a particular candidate. Disagreements about validity must be escalated
/// to a broader check by Misbehavior Arbitration. Agreements are simply tallied until a quorum is reached.
Statement(Hash, SignedStatement),
}
/// A message type used by the CandidateBacking Subsystem.
/// Blanket error for validation failing.
#[derive(Debug)]
pub enum CandidateBackingSubsystemMessage {
RegisterBackingWatcher,
Second,
pub struct ValidationFailed;
/// Messages received by the Validation subsystem
#[derive(Debug)]
pub enum CandidateValidationMessage {
/// Validate a candidate, sending a side-channel response of valid or invalid.
///
/// Provide the relay-parent in whose context this should be validated, the full candidate receipt,
/// and the PoV.
Validate(
Hash,
AbridgedCandidateReceipt,
PoVBlock,
oneshot::Sender<Result<(), ValidationFailed>>,
),
}
/// Chain heads.
///
/// Up to `N` (5?) chain heads.
pub struct View(pub Vec<Hash>);
/// Events from network.
pub enum NetworkBridgeEvent {
/// A peer has connected.
PeerConnected(PeerId, ObservedRole),
/// A peer has disconnected.
PeerDisconnected(PeerId),
/// Peer has sent a message.
PeerMessage(PeerId, Vec<u8>),
/// Peer's `View` has changed.
PeerViewChange(PeerId, View),
/// Our `View` has changed.
OurViewChange(View),
}
/// Messages received by the network bridge subsystem.
pub enum NetworkBridgeSubsystemMessage {
/// Register an event producer on startup.
RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages),
/// Report a peer for their actions.
ReportPeer(PeerId, ReputationChange),
/// Send a message to multiple peers.
SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>),
}
/// Availability Distribution Message.
pub enum AvailabilityDistributionMessage {
/// Distribute an availability chunk to other validators.
DistributeChunk(Hash, ErasureChunk),
/// Fetch an erasure chunk from networking by candidate hash and chunk index.
FetchChunk(Hash, u32),
/// Event from the network bridge.
NetworkBridgeUpdate(NetworkBridgeEvent),
}
/// Bitfield distribution message.
pub enum BitfieldDistributionMessage {
/// Distribute a bitfield via gossip to other validators.
DistributeBitfield(Hash, SignedAvailabilityBitfield),
/// Event from the network bridge.
NetworkBridgeUpdate(NetworkBridgeEvent),
}
/// Availability store subsystem message.
pub enum AvailabilityStoreMessage {
/// Query a `PoVBlock` from the AV store.
QueryPoV(Hash, oneshot::Sender<Option<PoVBlock>>),
/// Query an `ErasureChunk` from the AV store.
QueryChunk(Hash, ValidatorIndex, oneshot::Sender<ErasureChunk>),
/// Store an `ErasureChunk` in the AV store.
StoreChunk(Hash, ValidatorIndex, ErasureChunk),
}
/// A request to the Runtime API subsystem.
pub enum RuntimeApiRequest {
/// Get the current validator set.
Validators(oneshot::Sender<Vec<ValidatorId>>),
/// Get a signing context for bitfields and statements.
SigningContext(oneshot::Sender<SigningContext>),
/// Get the validation code for a specific para, assuming execution under given block number, and
/// an optional block number representing an intermediate parablock executed in the context of
/// that block.
ValidationCode(ParaId, BlockNumber, Option<BlockNumber>, oneshot::Sender<ValidationCode>),
}
/// A message to the Runtime API subsystem.
pub enum RuntimeApiMessage {
/// Make a request of the runtime API against the post-state of the given relay-parent.
Request(Hash, RuntimeApiRequest),
}
/// Statement distribution message.
pub enum StatementDistributionMessage {
/// We have originated a signed statement in the context of
/// given relay-parent hash and it should be distributed to other validators.
Share(Hash, SignedStatement),
}
/// This data becomes intrinsics or extrinsics which should be included in a future relay chain block.
pub enum ProvisionableData {
/// This bitfield indicates the availability of various candidate blocks.
Bitfield(Hash, SignedAvailabilityBitfield),
/// The Candidate Backing subsystem believes that this candidate is valid, pending availability.
BackedCandidate(BackedCandidate),
/// Misbehavior reports are self-contained proofs of validator misbehavior.
MisbehaviorReport(Hash, MisbehaviorReport),
/// Disputes trigger a broad dispute resolution process.
Dispute(Hash, Signature),
}
/// Message to the Provisioner.
///
/// In all cases, the Hash is that of the relay parent.
pub enum ProvisionerMessage {
/// This message allows potential block authors to be kept updated with all new authorship data
/// as it becomes available.
RequestBlockAuthorshipData(Hash, mpsc::Sender<ProvisionableData>),
/// This data should become part of a relay chain block
ProvisionableData(ProvisionableData),
}
/// A message type tying together all message types that are used across Subsystems.
#[derive(Debug)]
pub enum AllMessages {
Validation(ValidationSubsystemMessage),
CandidateBacking(CandidateBackingSubsystemMessage),
/// Message for the validation subsystem.
CandidateValidation(CandidateValidationMessage),
/// Message for the candidate backing subsystem.
CandidateBacking(CandidateBackingMessage),
}
/// A message type that a subsystem receives from an overseer.
......
......@@ -20,22 +20,24 @@
use std::time::Duration;
use futures::{
channel::oneshot,
pending, pin_mut, executor, select, stream,
FutureExt, StreamExt,
};
use futures_timer::Delay;
use kv_log_macro as log;
use polkadot_primitives::parachain::{BlockData, PoVBlock};
use polkadot_overseer::{Overseer, Subsystem, SubsystemContext, SpawnedSubsystem};
use messages::{
AllMessages, CandidateBackingSubsystemMessage, FromOverseer, ValidationSubsystemMessage
AllMessages, CandidateBackingMessage, FromOverseer, CandidateValidationMessage
};
struct Subsystem1;
impl Subsystem1 {
async fn run(mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) {
async fn run(mut ctx: SubsystemContext<CandidateBackingMessage>) {
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
......@@ -52,15 +54,24 @@ impl Subsystem1 {
}
Delay::new(Duration::from_secs(1)).await;
ctx.send_msg(AllMessages::Validation(
ValidationSubsystemMessage::ValidityAttestation
let (tx, _) = oneshot::channel();
ctx.send_msg(AllMessages::CandidateValidation(
CandidateValidationMessage::Validate(
Default::default(),
Default::default(),
PoVBlock {
block_data: BlockData(Vec::new()),
},
tx,
)
)).await.unwrap();
}
}
}
impl Subsystem<CandidateBackingSubsystemMessage> for Subsystem1 {
fn start(&mut self, ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateBackingMessage> for Subsystem1 {
fn start(&mut self, ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
......@@ -70,7 +81,7 @@ impl Subsystem<CandidateBackingSubsystemMessage> for Subsystem1 {
struct Subsystem2;
impl Subsystem2 {
async fn run(mut ctx: SubsystemContext<ValidationSubsystemMessage>) {
async fn run(mut ctx: SubsystemContext<CandidateValidationMessage>) {
ctx.spawn(Box::pin(async {
loop {
log::info!("Job tick");
......@@ -94,8 +105,8 @@ impl Subsystem2 {
}
}
impl Subsystem<ValidationSubsystemMessage> for Subsystem2 {
fn start(&mut self, ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateValidationMessage> for Subsystem2 {
fn start(&mut self, ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
......
......@@ -76,7 +76,7 @@ use polkadot_primitives::{Block, BlockNumber, Hash};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
pub use messages::{
OverseerSignal, ValidationSubsystemMessage, CandidateBackingSubsystemMessage, AllMessages,
OverseerSignal, CandidateValidationMessage, CandidateBackingMessage, AllMessages,
FromOverseer,
};
......@@ -367,10 +367,10 @@ struct OverseenSubsystem<M: Debug> {
/// The `Overseer` itself.
pub struct Overseer<S: Spawn> {
/// A validation subsystem
validation_subsystem: OverseenSubsystem<ValidationSubsystemMessage>,
validation_subsystem: OverseenSubsystem<CandidateValidationMessage>,
/// A candidate backing subsystem
candidate_backing_subsystem: OverseenSubsystem<CandidateBackingSubsystemMessage>,
candidate_backing_subsystem: OverseenSubsystem<CandidateBackingMessage>,
/// Spawner to spawn tasks to.
s: S,
......@@ -443,14 +443,14 @@ where
/// # use futures_timer::Delay;
/// # use polkadot_overseer::{
/// # Overseer, Subsystem, SpawnedSubsystem, SubsystemContext,
/// # ValidationSubsystemMessage, CandidateBackingSubsystemMessage,
/// # CandidateValidationMessage, CandidateBackingMessage,
/// # };
///
/// struct ValidationSubsystem;
/// impl Subsystem<ValidationSubsystemMessage> for ValidationSubsystem {
/// impl Subsystem<CandidateValidationMessage> for ValidationSubsystem {
/// fn start(
/// &mut self,
/// mut ctx: SubsystemContext<ValidationSubsystemMessage>,
/// mut ctx: SubsystemContext<CandidateValidationMessage>,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
......@@ -461,10 +461,10 @@ where
/// }
///
/// struct CandidateBackingSubsystem;
/// impl Subsystem<CandidateBackingSubsystemMessage> for CandidateBackingSubsystem {
/// impl Subsystem<CandidateBackingMessage> for CandidateBackingSubsystem {
/// fn start(
/// &mut self,
/// mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>,
/// mut ctx: SubsystemContext<CandidateBackingMessage>,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
......@@ -498,8 +498,8 @@ where
/// ```
pub fn new(
leaves: impl IntoIterator<Item = BlockInfo>,
validation: Box<dyn Subsystem<ValidationSubsystemMessage> + Send>,
candidate_backing: Box<dyn Subsystem<CandidateBackingSubsystemMessage> + Send>,
validation: Box<dyn Subsystem<CandidateValidationMessage> + Send>,
candidate_backing: Box<dyn Subsystem<CandidateBackingMessage> + Send>,
mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)> {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
......@@ -670,7 +670,7 @@ where
async fn route_message(&mut self, msg: AllMessages) {
match msg {
AllMessages::Validation(msg) => {
AllMessages::CandidateValidation(msg) => {
if let Some(ref mut s) = self.validation_subsystem.instance {
let _= s.tx.send(FromOverseer::Communication { msg }).await;
}
......@@ -717,12 +717,14 @@ fn spawn<S: Spawn, M: Debug>(
#[cfg(test)]
mod tests {
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt};
use polkadot_primitives::parachain::{BlockData, PoVBlock};
use super::*;
struct TestSubsystem1(mpsc::Sender<usize>);
impl Subsystem<ValidationSubsystemMessage> for TestSubsystem1 {
fn start(&mut self, mut ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateValidationMessage> for TestSubsystem1 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
let mut i = 0;
......@@ -744,15 +746,23 @@ mod tests {
struct TestSubsystem2(mpsc::Sender<usize>);
impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem2 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateBackingMessage> for TestSubsystem2 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
let mut c: usize = 0;
loop {
if c < 10 {
let (tx, _) = oneshot::channel();
ctx.send_msg(
AllMessages::Validation(
ValidationSubsystemMessage::ValidityAttestation
AllMessages::CandidateValidation(
CandidateValidationMessage::Validate(
Default::default(),
Default::default(),
PoVBlock {
block_data: BlockData(Vec::new()),
},
tx,
)
)
).await.unwrap();
c += 1;
......@@ -776,8 +786,8 @@ mod tests {
struct TestSubsystem4;
impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem4 {
fn start(&mut self, mut _ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateBackingMessage> for TestSubsystem4 {
fn start(&mut self, mut _ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
// Do nothing and exit.
}))
......@@ -861,8 +871,8 @@ mod tests {
struct TestSubsystem5(mpsc::Sender<OverseerSignal>);
impl Subsystem<ValidationSubsystemMessage> for TestSubsystem5 {
fn start(&mut self, mut ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateValidationMessage> for TestSubsystem5 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
......@@ -885,8 +895,8 @@ mod tests {
struct TestSubsystem6(mpsc::Sender<OverseerSignal>);
impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem6 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateBackingMessage> for TestSubsystem6 {
fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
......
[package]
name = "polkadot-node-primitives"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Primitives types for the Node-side"
[dependencies]
polkadot-primitives = { path = "../../primitives" }
polkadot-statement-table = { path = "../../statement-table" }
parity-scale-codec = { version = "1.3.0", default-features = false, features = ["derive"] }
runtime_primitives = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Primitive types used on the node-side.
//!
//! Unlike the `polkadot-primitives` crate, these primitives are only used on the node-side,
//! not shared between the node and the runtime. This crate builds on top of the primitives defined
//! there.
use runtime_primitives::traits::AppVerify;
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{
AbridgedCandidateReceipt, CandidateReceipt, SigningContext, ValidatorSignature,
ValidatorIndex, ValidatorId,
};
use parity_scale_codec::{Encode, Decode};
/// A statement, where the candidate receipt is included in the `Seconded` variant.
#[derive(Debug, Clone, PartialEq, Encode, Decode)]
pub enum Statement {
/// A statement that a validator seconds a candidate.
#[codec(index = "1")]
Seconded(AbridgedCandidateReceipt),
/// A statement that a validator has deemed a candidate valid.
#[codec(index = "2")]
Valid(Hash),
/// A statement that a validator has deemed a candidate invalid.
#[codec(index = "3")]
Invalid(Hash),
}
impl Statement {
/// Get the signing payload of the statement.
pub fn signing_payload(&self, context: &SigningContext) -> Vec<u8> {
// convert to fully hash-based payload.
let statement = match *self {
Statement::Seconded(ref c) => polkadot_primitives::parachain::Statement::Candidate(c.hash()),
Statement::Valid(hash) => polkadot_primitives::parachain::Statement::Valid(hash),
Statement::Invalid(hash) => polkadot_primitives::parachain::Statement::Invalid(hash),
};
statement.signing_payload(context)
}
}
/// A statement, the corresponding signature, and the index of the sender.
///
/// Signing context and validator set should be apparent from context.
#[derive(Debug, Clone, PartialEq, Encode, Decode)]
pub struct SignedStatement {
/// The statement signed.
pub statement: Statement,
/// The signature of the validator.
pub signature: ValidatorSignature,
/// The index in the validator set of the signing validator. Which validator set should
/// be apparent from context.
pub sender: ValidatorIndex,
}
impl SignedStatement {
/// Check the signature on a statement. Provide a list of validators to index into
/// and the context in which the statement is presumably signed.
///
/// Returns an error if out of bounds or the signature is invalid. Otherwise, returns Ok.
pub fn check_signature(
&self,
validators: &[ValidatorId],
signing_context: &SigningContext,
) -> Result<(), ()> {
let validator = validators.get(self.sender as usize).ok_or(())?;
let payload = self.statement.signing_payload(signing_context);
if self.signature.verify(&payload[..], validator) {
Ok(())
} else {
Err(())
}
}
}
/// A misbehaviour report.
pub enum MisbehaviorReport {
/// These validator nodes disagree on this candidate's validity, please figure it out
///
/// Most likely, the list of statments all agree except for the final one. That's not
/// guaranteed, though; if somehow we become aware of lots of
/// statements disagreeing about the validity of a candidate before taking action,
/// this message should be dispatched with all of them, in arbitrary order.
///
/// This variant is also used when our own validity checks disagree with others'.
CandidateValidityDisagreement(CandidateReceipt, Vec<SignedStatement>),
/// I've noticed a peer contradicting itself about a particular candidate
SelfContradiction(CandidateReceipt, SignedStatement, SignedStatement),
/// This peer has seconded more than one parachain candidate for this relay parent head
DoubleVote(CandidateReceipt, SignedStatement, SignedStatement),
}
......@@ -32,7 +32,7 @@ use sp_blockchain::HeaderBackend;
use polkadot_overseer::{
self as overseer,
BlockInfo, Overseer, OverseerHandler, Subsystem, SubsystemContext, SpawnedSubsystem,
ValidationSubsystemMessage, CandidateBackingSubsystemMessage,
CandidateValidationMessage, CandidateBackingMessage,
};
pub use service::{
AbstractService, Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis,
......@@ -268,10 +268,10 @@ macro_rules! new_full_start {
}}
}
struct ValidationSubsystem;
struct CandidateValidationSubsystem;
impl Subsystem<ValidationSubsystemMessage> for ValidationSubsystem {
fn start(&mut self, mut ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
impl Subsystem<CandidateValidationMessage> for CandidateValidationSubsystem {
fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
while let Ok(_) = ctx.recv().await {}
}))
......@@ -280,8 +280,8 @@ impl Subsystem<ValidationSubsystemMessage> for ValidationSubsystem {
struct CandidateBackingSubsystem;
impl Subsystem<CandidateBackingSubsystemMessage> for CandidateBackingSubsystem {