Unverified Commit d23a6d5a authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Split NetworkBridge and break cycles with Unbounded (#2736)



* overseer: pass messages directly between subsystems

* test that message is held on to

* Update node/overseer/src/lib.rs

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* give every subsystem an unbounded sender too

* remove metered_channel::name

1. we don't provide good names
2. these names are never used anywhere

* unused mut

* remove unnecessary &mut

* subsystem unbounded_send

* remove unused MaybeTimer

We have channel size metrics that serve the same purpose better now and the implementation of message timing was pretty ugly.

* remove comment

* split up senders and receivers

* update metrics

* fix tests

* fix test subsystem context

* use SubsystemSender in jobs system now

* refactor of awful jobs code

* expose public `run` on JobSubsystem

* update candidate backing to new jobs & use unbounded

* bitfield signing

* candidate-selection

* provisioner

* approval voting: send unbounded for assignment/approvals

* async not needed

* begin bridge split

* split up network tasks into background worker

* port over network bridge

* Update node/network/bridge/src/lib.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* rename ValidationWorkerNotifications

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 65a5d185
Pipeline #131149 failed with stages
in 17 minutes and 11 seconds
......@@ -5632,6 +5632,7 @@ version = "0.1.0"
dependencies = [
"futures 0.3.13",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-keystore",
......@@ -5647,6 +5648,7 @@ dependencies = [
"futures 0.3.13",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-core",
......@@ -5720,6 +5722,7 @@ dependencies = [
"futures 0.3.13",
"futures-timer 3.0.2",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-application-crypto",
......
......@@ -564,10 +564,10 @@ async fn handle_actions(
let block_hash = indirect_cert.block_hash;
let validator_index = indirect_cert.validator;
ctx.send_message(ApprovalDistributionMessage::DistributeAssignment(
ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
indirect_cert,
candidate_index,
).into()).await;
).into());
launch_approval(
ctx,
......@@ -712,7 +712,7 @@ async fn handle_background_request(
) -> SubsystemResult<Vec<Action>> {
match request {
BackgroundRequest::ApprovalVote(vote_request) => {
issue_approval(ctx, state, metrics, vote_request).await
issue_approval(ctx, state, metrics, vote_request)
}
BackgroundRequest::CandidateValidation(
validation_data,
......@@ -1724,7 +1724,7 @@ async fn launch_approval(
// Issue and import a local approval vote. Should only be invoked after approval checks
// have been done.
async fn issue_approval(
fn issue_approval(
ctx: &mut impl SubsystemContext,
state: &State<impl DBReader>,
metrics: &Metrics,
......@@ -1830,12 +1830,14 @@ async fn issue_approval(
metrics.on_approval_produced();
// dispatch to approval distribution.
ctx.send_message(ApprovalDistributionMessage::DistributeApproval(IndirectSignedApprovalVote {
block_hash,
candidate_index: candidate_index as _,
validator: validator_index,
signature: sig,
}).into()).await;
ctx.send_unbounded_message(
ApprovalDistributionMessage::DistributeApproval(IndirectSignedApprovalVote {
block_hash,
candidate_index: candidate_index as _,
validator: validator_index,
signature: sig,
}
).into());
Ok(actions)
}
......
This diff is collapsed.
......@@ -13,3 +13,6 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
wasm-timer = "0.2.5"
thiserror = "1.0.23"
[dev-dependencies]
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
......@@ -23,15 +23,16 @@
use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future};
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
use polkadot_node_subsystem::{
jaeger, PerLeafSpan,
jaeger, PerLeafSpan, SubsystemSender,
messages::{
AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
AvailabilityStoreMessage, BitfieldDistributionMessage,
BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest,
},
errors::RuntimeApiError,
};
use polkadot_node_subsystem_util::{
self as util, JobManager, JobTrait, Validator, FromJobCommand, metrics::{self, prometheus},
self as util, JobSubsystem, JobTrait, Validator, metrics::{self, prometheus},
JobSender,
};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use std::{pin::Pin, time::Duration, iter::FromIterator, sync::Arc};
......@@ -73,7 +74,7 @@ pub enum Error {
async fn get_core_availability(
core: &CoreState,
validator_idx: ValidatorIndex,
sender: &Mutex<&mut mpsc::Sender<FromJobCommand>>,
sender: &Mutex<&mut impl SubsystemSender>,
span: &jaeger::Span,
) -> Result<bool, Error> {
if let &CoreState::Occupied(ref core) = core {
......@@ -83,14 +84,14 @@ async fn get_core_availability(
sender
.lock()
.await
.send(
AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability(
.send_message(
AvailabilityStoreMessage::QueryChunkAvailability(
core.candidate_hash,
validator_idx,
tx,
)).into(),
).into(),
)
.await?;
.await;
let res = rx.await.map_err(Into::into);
......@@ -111,12 +112,15 @@ async fn get_core_availability(
/// delegates to the v1 runtime API
async fn get_availability_cores(
relay_parent: Hash,
sender: &mut mpsc::Sender<FromJobCommand>,
sender: &mut impl SubsystemSender,
) -> Result<Vec<CoreState>, Error> {
let (tx, rx) = oneshot::channel();
sender
.send(AllMessages::from(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx))).into())
.await?;
.send_message(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx),
).into())
.await;
match rx.await {
Ok(Ok(out)) => Ok(out),
Ok(Err(runtime_err)) => Err(runtime_err.into()),
......@@ -133,7 +137,7 @@ async fn construct_availability_bitfield(
relay_parent: Hash,
span: &jaeger::Span,
validator_idx: ValidatorIndex,
sender: &mut mpsc::Sender<FromJobCommand>,
sender: &mut impl SubsystemSender,
) -> Result<AvailabilityBitfield, Error> {
// get the set of availability cores from the runtime
let availability_cores = {
......@@ -223,13 +227,13 @@ impl JobTrait for BitfieldSigningJob {
/// Run a job for the parent block indicated
#[tracing::instrument(skip(span, keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
fn run<S: SubsystemSender>(
relay_parent: Hash,
span: Arc<jaeger::Span>,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
mut sender: mpsc::Sender<FromJobCommand>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let metrics = metrics.clone();
async move {
......@@ -239,7 +243,7 @@ impl JobTrait for BitfieldSigningJob {
// now do all the work we can before we need to wait for the availability store
// if we're not a validator, we can just succeed effortlessly
let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await {
let validator = match Validator::new(relay_parent, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
......@@ -260,7 +264,7 @@ impl JobTrait for BitfieldSigningJob {
relay_parent,
&span_availability,
validator.index(),
&mut sender,
sender.subsystem_sender(),
).await
{
Err(Error::Runtime(runtime_err)) => {
......@@ -295,26 +299,27 @@ impl JobTrait for BitfieldSigningJob {
let _span = span.child("gossip");
sender
.send(
AllMessages::from(
BitfieldDistributionMessage::DistributeBitfield(relay_parent, signed_bitfield),
).into(),
)
.await
.map_err(Into::into)
.send_message(BitfieldDistributionMessage::DistributeBitfield(
relay_parent,
signed_bitfield,
).into())
.await;
Ok(())
}
.boxed()
}
}
/// BitfieldSigningSubsystem manages a number of bitfield signing jobs.
pub type BitfieldSigningSubsystem<Spawner, Context> = JobManager<Spawner, Context, BitfieldSigningJob>;
pub type BitfieldSigningSubsystem<Spawner> = JobSubsystem<BitfieldSigningJob, Spawner>;
#[cfg(test)]
mod tests {
use super::*;
use futures::{pin_mut, executor::block_on};
use polkadot_primitives::v1::{CandidateHash, OccupiedCore};
use polkadot_node_subsystem::messages::AllMessages;
fn occupied_core(para_id: u32, candidate_hash: CandidateHash) -> CoreState {
CoreState::Occupied(OccupiedCore {
......@@ -332,10 +337,10 @@ mod tests {
#[test]
fn construct_availability_bitfield_works() {
block_on(async move {
let (mut sender, mut receiver) = mpsc::channel(10);
let relay_parent = Hash::default();
let validator_index = ValidatorIndex(1u32);
let (mut sender, mut receiver) = polkadot_node_subsystem_test_helpers::sender_receiver();
let future = construct_availability_bitfield(
relay_parent,
&jaeger::Span::Disabled,
......@@ -350,18 +355,14 @@ mod tests {
loop {
futures::select! {
m = receiver.next() => match m.unwrap() {
FromJobCommand::SendMessage(
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx)),
),
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx)),
) => {
assert_eq!(relay_parent, rp);
tx.send(Ok(vec![CoreState::Free, occupied_core(1, hash_a), occupied_core(2, hash_b)])).unwrap();
},
FromJobCommand::SendMessage(
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunkAvailability(c_hash, vidx, tx),
),
}
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunkAvailability(c_hash, vidx, tx),
) => {
assert_eq!(validator_index, vidx);
......
......@@ -18,3 +18,4 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
......@@ -25,16 +25,16 @@ use futures::{
};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_subsystem::{
jaeger, PerLeafSpan,
jaeger, PerLeafSpan, SubsystemSender,
errors::ChainApiError,
messages::{
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
RuntimeApiRequest,
},
};
use polkadot_node_subsystem_util::{
self as util, request_from_runtime, request_validator_groups, delegated_subsystem,
JobTrait, FromJobCommand, Validator, metrics::{self, prometheus},
self as util, request_from_runtime, request_validator_groups, JobSubsystem,
JobTrait, JobSender, Validator, metrics::{self, prometheus},
};
use polkadot_primitives::v1::{
CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV, BlockNumber,
......@@ -45,22 +45,24 @@ use thiserror::Error;
const LOG_TARGET: &'static str = "parachain::candidate-selection";
struct CandidateSelectionJob {
/// A per-block job in the candidate selection subsystem.
pub struct CandidateSelectionJob {
assignment: ParaId,
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
metrics: Metrics,
seconded_candidate: Option<CollatorId>,
}
/// Errors in the candidate selection subsystem.
#[derive(Debug, Error)]
enum Error {
#[error(transparent)]
Sending(#[from] mpsc::SendError),
pub enum Error {
/// An error in utilities.
#[error(transparent)]
Util(#[from] util::Error),
/// An error receiving on a oneshot channel.
#[error(transparent)]
OneshotRecv(#[from] oneshot::Canceled),
/// An error interacting with the chain API.
#[error(transparent)]
ChainApi(#[from] ChainApiError),
}
......@@ -94,13 +96,13 @@ impl JobTrait for CandidateSelectionJob {
const NAME: &'static str = "CandidateSelectionJob";
#[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
fn run<S: SubsystemSender>(
relay_parent: Hash,
span: Arc<jaeger::Span>,
keystore: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
mut sender: mpsc::Sender<FromJobCommand>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let span = PerLeafSpan::new(span, "candidate-selection");
async move {
......@@ -108,12 +110,12 @@ impl JobTrait for CandidateSelectionJob {
.with_relay_parent(relay_parent)
.with_stage(jaeger::Stage::CandidateSelection);
let (groups, cores) = futures::try_join!(
try_runtime_api!(request_validator_groups(relay_parent, &mut sender).await),
try_runtime_api!(request_from_runtime(
request_validator_groups(relay_parent, &mut sender).await,
request_from_runtime(
relay_parent,
&mut sender,
|tx| RuntimeApiRequest::AvailabilityCores(tx),
).await),
).await,
)?;
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
......@@ -126,7 +128,7 @@ impl JobTrait for CandidateSelectionJob {
let n_cores = cores.len();
let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await {
let validator = match Validator::new(relay_parent, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
......@@ -198,20 +200,20 @@ impl JobTrait for CandidateSelectionJob {
drop(assignment_span);
CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop(&span).await
CandidateSelectionJob::new(assignment, metrics, receiver)
.run_loop(&span, sender.subsystem_sender())
.await
}.boxed()
}
}
impl CandidateSelectionJob {
pub fn new(
fn new(
assignment: ParaId,
metrics: Metrics,
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
) -> Self {
Self {
sender,
receiver,
metrics,
assignment,
......@@ -219,7 +221,11 @@ impl CandidateSelectionJob {
}
}
async fn run_loop(&mut self, span: &jaeger::Span) -> Result<(), Error> {
async fn run_loop(
&mut self,
span: &jaeger::Span,
sender: &mut impl SubsystemSender,
) -> Result<(), Error> {
let span = span.child("run-loop")
.with_stage(jaeger::Stage::CandidateSelection);
......@@ -231,7 +237,7 @@ impl CandidateSelectionJob {
collator_id,
)) => {
let _span = span.child("handle-collation");
self.handle_collation(relay_parent, para_id, collator_id).await;
self.handle_collation(sender, relay_parent, para_id, collator_id).await;
}
Some(CandidateSelectionMessage::Invalid(
_relay_parent,
......@@ -241,28 +247,26 @@ impl CandidateSelectionJob {
.with_stage(jaeger::Stage::CandidateSelection)
.with_candidate(candidate_receipt.hash())
.with_relay_parent(_relay_parent);
self.handle_invalid(candidate_receipt).await;
self.handle_invalid(sender, candidate_receipt).await;
}
Some(CandidateSelectionMessage::Seconded(_relay_parent, statement)) => {
let _span = span.child("handle-seconded")
.with_stage(jaeger::Stage::CandidateSelection)
.with_candidate(statement.payload().candidate_hash())
.with_relay_parent(_relay_parent);
self.handle_seconded(statement).await;
self.handle_seconded(sender, statement).await;
}
None => break,
}
}
// closing the sender here means that we don't deadlock in tests
self.sender.close_channel();
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))]
async fn handle_collation(
&mut self,
sender: &mut impl SubsystemSender,
relay_parent: Hash,
para_id: ParaId,
collator_id: CollatorId,
......@@ -276,13 +280,7 @@ impl CandidateSelectionJob {
collator_id,
para_id,
);
if let Err(err) = forward_invalidity_note(&collator_id, &mut self.sender).await {
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"failed to forward invalidity note",
);
}
forward_invalidity_note(&collator_id, sender).await;
return;
}
......@@ -292,7 +290,7 @@ impl CandidateSelectionJob {
relay_parent,
para_id,
collator_id.clone(),
self.sender.clone(),
sender,
).await {
Ok(response) => response,
Err(err) => {
......@@ -305,21 +303,23 @@ impl CandidateSelectionJob {
}
};
match second_candidate(
second_candidate(
relay_parent,
candidate_receipt,
pov,
&mut self.sender,
sender,
&self.metrics,
).await {
Err(err) => tracing::warn!(target: LOG_TARGET, err = ?err, "failed to second a candidate"),
Ok(()) => self.seconded_candidate = Some(collator_id),
}
).await;
self.seconded_candidate = Some(collator_id);
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn handle_invalid(&mut self, candidate_receipt: CandidateReceipt) {
#[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))]
async fn handle_invalid(
&mut self,
sender: &mut impl SubsystemSender,
candidate_receipt: CandidateReceipt,
) {
let _timer = self.metrics.time_handle_invalid();
let received_from = match &self.seconded_candidate {
......@@ -338,21 +338,15 @@ impl CandidateSelectionJob {
"received invalidity note for candidate",
);
let result =
if let Err(err) = forward_invalidity_note(received_from, &mut self.sender).await {
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"failed to forward invalidity note",
);
Err(())
} else {
Ok(())
};
self.metrics.on_invalid_selection(result);
forward_invalidity_note(received_from, sender).await;
self.metrics.on_invalid_selection();
}
async fn handle_seconded(&mut self, statement: SignedFullStatement) {
async fn handle_seconded(
&mut self,
sender: &mut impl SubsystemSender,
statement: SignedFullStatement,
) {
let received_from = match &self.seconded_candidate {
Some(peer) => peer,
None => {
......@@ -369,27 +363,13 @@ impl CandidateSelectionJob {
"received seconded note for candidate",
);
if let Err(e) = self.sender
.send(AllMessages::from(CollatorProtocolMessage::NoteGoodCollation(received_from.clone())).into()).await
{
tracing::debug!(
target: LOG_TARGET,
error = ?e,
"failed to note good collator"
);
}
sender
.send_message(CollatorProtocolMessage::NoteGoodCollation(received_from.clone()).into())
.await;
if let Err(e) = self.sender
.send(AllMessages::from(
CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement)
).into()).await
{
tracing::debug!(
target: LOG_TARGET,
error = ?e,
"failed to notify collator about seconded collation"
);
}
sender.send_message(
CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement).into()
).await;
}
}
......@@ -401,17 +381,18 @@ async fn get_collation(
relay_parent: Hash,
para_id: ParaId,
collator_id: CollatorId,
mut sender: mpsc::Sender<FromJobCommand>,
sender: &mut impl SubsystemSender,
) -> Result<(CandidateReceipt, PoV), Error> {
let (tx, rx) = oneshot::channel();
sender
.send(AllMessages::from(CollatorProtocolMessage::FetchCollation(
.send_message(CollatorProtocolMessage::FetchCollation(
relay_parent,
collator_id,
para_id,
tx,
)).into())
.await?;
).into())
.await;
rx.await.map_err(Into::into)
}
......@@ -419,45 +400,33 @@ async fn second_candidate(
relay_parent: Hash,
candidate_receipt: CandidateReceipt,
pov: PoV,
sender: &mut mpsc::Sender<FromJobCommand>,
sender: &mut impl SubsystemSender,
metrics: &Metrics,
) -> Result<(), Error> {
match sender
.send(AllMessages::from(CandidateBackingMessage::Second(
) {
sender
.send_message(CandidateBackingMessage::Second(
relay_parent,
candidate_receipt,
pov,
)).into())
.await
{
Err(err) => {
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to send a seconding message");
metrics.on_second(Err(()));
Err(err.into())
}
Ok(_) => {
metrics.on_second(Ok(()));
Ok(())