Unverified Commit c0387bab authored by Bernhard Schuster's avatar Bernhard Schuster Committed by GitHub
Browse files

refactor overseer into proc-macro based pattern (#2962)

parent d80f8489
Pipeline #146399 passed with stages
in 42 minutes and 27 seconds
......@@ -5993,6 +5993,7 @@ dependencies = [
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-overseer",
"polkadot-primitives",
"sc-authority-discovery",
"sc-network",
......@@ -6236,7 +6237,6 @@ dependencies = [
"futures 0.3.15",
"futures-timer 3.0.2",
"polkadot-node-subsystem",
"polkadot-overseer",
"polkadot-primitives",
"sp-blockchain",
"sp-inherents",
......@@ -6331,6 +6331,21 @@ dependencies = [
"thiserror",
]
[[package]]
name = "polkadot-node-metrics"
version = "0.1.0"
dependencies = [
"async-trait",
"futures 0.3.15",
"futures-timer 3.0.2",
"metered-channel",
"sc-network",
"sp-application-crypto",
"sp-core",
"sp-keystore",
"substrate-prometheus-endpoint",
]
[[package]]
name = "polkadot-node-network-protocol"
version = "0.1.0"
......@@ -6371,52 +6386,61 @@ dependencies = [
name = "polkadot-node-subsystem"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-std",
"polkadot-node-jaeger",
"polkadot-node-subsystem-types",
"polkadot-overseer",
]
[[package]]
name = "polkadot-node-subsystem-test-helpers"
version = "0.1.0"
dependencies = [
"async-trait",
"derive_more",
"futures 0.3.15",
"futures-timer 3.0.2",
"lazy_static",
"log",
"mick-jaeger",
"parity-scale-codec",
"parking_lot 0.11.1",
"pin-project 1.0.7",
"polkadot-node-jaeger",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-overseer",
"polkadot-primitives",
"polkadot-procmacro-subsystem-dispatch-gen",
"polkadot-statement-table",
"sc-network",
"smallvec 1.6.1",
"sp-core",
"substrate-prometheus-endpoint",
"thiserror",
"tracing",
]
[[package]]
name = "polkadot-node-subsystem-test-helpers"
name = "polkadot-node-subsystem-types"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-std",
"async-trait",
"derive_more",
"futures 0.3.15",
"futures-timer 3.0.2",
"lazy_static",
"log",
"mick-jaeger",
"parity-scale-codec",
"parking_lot 0.11.1",
"pin-project 1.0.7",
"polkadot-node-jaeger",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-overseer",
"polkadot-node-subsystem-test-helpers",
"polkadot-overseer-gen",
"polkadot-primitives",
"polkadot-statement-table",
"sc-network",
"smallvec 1.6.1",
"sp-core",
"substrate-prometheus-endpoint",
"thiserror",
"tracing",
]
......@@ -6437,10 +6461,12 @@ dependencies = [
"parking_lot 0.11.1",
"pin-project 1.0.7",
"polkadot-node-jaeger",
"polkadot-node-metrics",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-overseer",
"polkadot-primitives",
"rand 0.8.4",
"sc-network",
......@@ -6463,18 +6489,59 @@ dependencies = [
"futures-timer 3.0.2",
"kv-log-macro",
"lru",
"metered-channel",
"polkadot-node-metrics",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-node-subsystem-types",
"polkadot-overseer-all-subsystems-gen",
"polkadot-overseer-gen",
"polkadot-primitives",
"polkadot-procmacro-overseer-subsystems-gen",
"sc-client-api",
"sp-api",
"sp-core",
"tracing",
]
[[package]]
name = "polkadot-overseer-all-subsystems-gen"
version = "0.1.0"
dependencies = [
"assert_matches",
"proc-macro2",
"quote",
"syn",
"trybuild",
]
[[package]]
name = "polkadot-overseer-gen"
version = "0.1.0"
dependencies = [
"async-trait",
"futures 0.3.15",
"futures-timer 3.0.2",
"metered-channel",
"pin-project 1.0.7",
"polkadot-node-network-protocol",
"polkadot-overseer-gen-proc-macro",
"sp-core",
"thiserror",
"tracing",
"trybuild",
]
[[package]]
name = "polkadot-overseer-gen-proc-macro"
version = "0.1.0"
dependencies = [
"assert_matches",
"proc-macro-crate 1.0.0",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "polkadot-parachain"
version = "0.9.8"
......@@ -6520,28 +6587,6 @@ dependencies = [
"thiserror",
]
[[package]]
name = "polkadot-procmacro-overseer-subsystems-gen"
version = "0.1.0"
dependencies = [
"assert_matches",
"proc-macro2",
"quote",
"syn",
"trybuild",
]
[[package]]
name = "polkadot-procmacro-subsystem-dispatch-gen"
version = "0.1.0"
dependencies = [
"assert_matches",
"proc-macro2",
"quote",
"syn",
"trybuild",
]
[[package]]
name = "polkadot-rpc"
version = "0.9.8"
......@@ -7202,9 +7247,9 @@ checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
[[package]]
name = "proc-macro2"
version = "1.0.24"
version = "1.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71"
checksum = "f0d8caf72986c1a598726adc988bb5984792ef84f5ee5aa50209145ee8077038"
dependencies = [
"unicode-xid",
]
......
......@@ -65,14 +65,18 @@ members = [
"node/network/collator-protocol",
"node/network/gossip-support",
"node/overseer",
"node/overseer/overseer-gen",
"node/overseer/overseer-gen/proc-macro",
"node/overseer/all-subsystems-gen",
"node/malus",
"node/primitives",
"node/service",
"node/subsystem",
"node/subsystem/dispatch-gen",
"node/subsystem-types",
"node/subsystem-test-helpers",
"node/subsystem-util",
"node/jaeger",
"node/metrics",
"node/metered-channel",
"node/test/client",
"node/test/service",
......
......@@ -30,8 +30,11 @@ use polkadot_node_primitives::{
CollationGenerationConfig, AvailableData, PoV,
};
use polkadot_node_subsystem::{
ActiveLeavesUpdate,
messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
SpawnedSubsystem, SubsystemContext, SubsystemResult,
SubsystemError, FromOverseer, OverseerSignal,
overseer,
};
use polkadot_node_subsystem_util::{
request_availability_cores, request_persisted_validation_data,
......@@ -83,6 +86,7 @@ impl CollationGenerationSubsystem {
async fn run<Context>(mut self, mut ctx: Context)
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
Context: overseer::SubsystemContext<Message = CollationGenerationMessage>,
{
// when we activate new leaves, we spawn a bunch of sub-tasks, each of which is
// expected to generate precisely one message. We don't want to block the main loop
......@@ -114,19 +118,16 @@ impl CollationGenerationSubsystem {
// it should hopefully therefore be ok that it's an async function mutably borrowing self.
async fn handle_incoming<Context>(
&mut self,
incoming: SubsystemResult<FromOverseer<Context::Message>>,
incoming: SubsystemResult<FromOverseer<<Context as SubsystemContext>::Message>>,
ctx: &mut Context,
sender: &mpsc::Sender<AllMessages>,
) -> bool
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
Context: overseer::SubsystemContext<Message = CollationGenerationMessage>,
{
use polkadot_node_subsystem::ActiveLeavesUpdate;
use polkadot_node_subsystem::FromOverseer::{Communication, Signal};
use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};
match incoming {
Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => {
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => {
// follow the procedure from the guide
if let Some(config) = &self.config {
let metrics = self.metrics.clone();
......@@ -143,8 +144,8 @@ impl CollationGenerationSubsystem {
false
}
Ok(Signal(Conclude)) => true,
Ok(Communication {
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => true,
Ok(FromOverseer::Communication {
msg: CollationGenerationMessage::Initialize(config),
}) => {
if self.config.is_some() {
......@@ -154,7 +155,7 @@ impl CollationGenerationSubsystem {
}
false
}
Ok(Signal(BlockFinalized(..))) => false,
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(..))) => false,
Err(err) => {
tracing::error!(
target: LOG_TARGET,
......@@ -168,9 +169,10 @@ impl CollationGenerationSubsystem {
}
}
impl<Context> Subsystem<Context> for CollationGenerationSubsystem
impl<Context> overseer::Subsystem<Context, SubsystemError> for CollationGenerationSubsystem
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
Context: overseer::SubsystemContext<Message = CollationGenerationMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async move {
......
......@@ -29,6 +29,7 @@
//! We maintain a rolling window of session indices. This starts as empty
use polkadot_node_subsystem::{
overseer,
messages::{
RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage, ApprovalDistributionMessage,
ChainSelectionMessage,
......@@ -84,7 +85,7 @@ struct ImportedBlockInfoEnv<'a> {
// Computes information about the imported block. Returns `None` if the info couldn't be extracted -
// failure to communicate with overseer,
async fn imported_block_info(
ctx: &mut impl SubsystemContext,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
env: ImportedBlockInfoEnv<'_>,
block_hash: Hash,
block_header: &Header,
......@@ -98,7 +99,7 @@ async fn imported_block_info(
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CandidateEvents(c_tx),
).into()).await;
)).await;
let events: Vec<CandidateEvent> = match c_rx.await {
Ok(Ok(events)) => events,
......@@ -120,7 +121,7 @@ async fn imported_block_info(
ctx.send_message(RuntimeApiMessage::Request(
block_header.parent_hash,
RuntimeApiRequest::SessionIndexForChild(s_tx),
).into()).await;
)).await;
let session_index = match s_rx.await {
Ok(Ok(s)) => s,
......@@ -161,7 +162,7 @@ async fn imported_block_info(
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CurrentBabeEpoch(s_tx),
).into()).await;
)).await;
match s_rx.await {
Ok(Ok(s)) => s,
......@@ -284,20 +285,21 @@ pub struct BlockImportedCandidates {
/// * and return information about all candidates imported under each block.
///
/// It is the responsibility of the caller to schedule wakeups for each block.
pub(crate) async fn handle_new_head<'a>(
ctx: &mut impl SubsystemContext,
pub(crate) async fn handle_new_head(
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
state: &mut State,
db: &mut OverlayedBackend<'a, impl Backend>,
db: &mut OverlayedBackend<'_, impl Backend>,
head: Hash,
finalized_number: &Option<BlockNumber>,
) -> SubsystemResult<Vec<BlockImportedCandidates>> {
) -> SubsystemResult<Vec<BlockImportedCandidates>>
{
// Update session info based on most recent head.
let mut span = jaeger::Span::new(head, "approval-checking-import");
let header = {
let (h_tx, h_rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx).into()).await;
ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await;
match h_rx.await? {
Err(e) => {
......@@ -375,7 +377,7 @@ pub(crate) async fn handle_new_head<'a>(
// It's possible that we've lost a race with finality.
let (tx, rx) = oneshot::channel();
ctx.send_message(
ChainApiMessage::FinalizedBlockHash(block_header.number.clone(), tx).into()
ChainApiMessage::FinalizedBlockHash(block_header.number.clone(), tx)
).await;
let lost_to_finality = match rx.await {
......@@ -469,7 +471,7 @@ pub(crate) async fn handle_new_head<'a>(
// If all bits are already set, then send an approve message.
if approved_bitfield.count_ones() == approved_bitfield.len() {
ctx.send_message(ChainSelectionMessage::Approved(block_hash).into()).await;
ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await;
}
let block_entry = v1::BlockEntry {
......@@ -498,7 +500,7 @@ pub(crate) async fn handle_new_head<'a>(
// Notify chain-selection of all approved hashes.
for hash in approved_hashes {
ctx.send_message(ChainSelectionMessage::Approved(hash).into()).await;
ctx.send_message(ChainSelectionMessage::Approved(hash)).await;
}
}
......@@ -551,7 +553,7 @@ pub(crate) async fn handle_new_head<'a>(
"Informing distribution of newly imported chain",
);
ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta).into());
ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta));
Ok(imported_candidates)
}
......
......@@ -29,7 +29,7 @@ use polkadot_node_subsystem::{
AvailabilityRecoveryMessage, ChainSelectionMessage,
},
errors::RecoveryError,
Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
overseer::{self, SubsystemSender as _}, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
FromOverseer, OverseerSignal, SubsystemSender,
};
use polkadot_node_subsystem_util::{
......@@ -333,12 +333,15 @@ impl ApprovalVotingSubsystem {
}
}
impl<C> Subsystem<C> for ApprovalVotingSubsystem
where C: SubsystemContext<Message = ApprovalVotingMessage>
impl<Context> overseer::Subsystem<Context, SubsystemError> for ApprovalVotingSubsystem
where
Context: SubsystemContext<Message = ApprovalVotingMessage>,
Context: overseer::SubsystemContext<Message = ApprovalVotingMessage>,
{
fn start(self, ctx: C) -> SpawnedSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let backend = DbBackend::new(self.db.clone(), self.db_config);
let future = run::<DbBackend, C>(
let future = run::<DbBackend, Context>(
ctx,
self,
Box::new(SystemClock),
......@@ -663,15 +666,16 @@ enum Action {
Conclude,
}
async fn run<B, C>(
mut ctx: C,
async fn run<B, Context>(
mut ctx: Context,
mut subsystem: ApprovalVotingSubsystem,
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
mut backend: B,
) -> SubsystemResult<()>
where
C: SubsystemContext<Message = ApprovalVotingMessage>,
Context: SubsystemContext<Message = ApprovalVotingMessage>,
Context: overseer::SubsystemContext<Message = ApprovalVotingMessage>,
B: Backend,
{
let mut state = State {
......@@ -797,7 +801,7 @@ async fn run<B, C>(
//
// returns `true` if any of the actions was a `Conclude` command.
async fn handle_actions(
ctx: &mut impl SubsystemContext,
ctx: &mut (impl SubsystemContext<Message = ApprovalVotingMessage> + overseer::SubsystemContext<Message = ApprovalVotingMessage>),
state: &mut State,
overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
metrics: &Metrics,
......@@ -861,7 +865,7 @@ async fn handle_actions(
ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
indirect_cert,
candidate_index,
).into());
));
match approvals_cache.get(&candidate_hash) {
Some(ApprovalOutcome::Approved) => {
......@@ -902,14 +906,14 @@ async fn handle_actions(
}
}
Action::NoteApprovedInChainSelection(block_hash) => {
ctx.send_message(ChainSelectionMessage::Approved(block_hash).into()).await;
ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await;
}
Action::BecomeActive => {
*mode = Mode::Active;
let messages = distribution_messages_for_activation(overlayed_db)?;
ctx.send_messages(messages.into_iter().map(Into::into)).await;
ctx.send_messages(messages.into_iter()).await;
}
Action::Conclude => { conclude = true; }
}
......@@ -1017,7 +1021,7 @@ fn distribution_messages_for_activation(
// Handle an incoming signal from the overseer. Returns true if execution should conclude.
async fn handle_from_overseer(
ctx: &mut impl SubsystemContext,
ctx: &mut (impl SubsystemContext<Message = ApprovalVotingMessage> + overseer::SubsystemContext<Message = ApprovalVotingMessage>),
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
metrics: &Metrics,
......@@ -1130,7 +1134,7 @@ async fn handle_from_overseer(
}
async fn handle_approved_ancestor(
ctx: &mut impl SubsystemContext,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
db: &OverlayedBackend<'_, impl Backend>,
target: Hash,
lower_bound: BlockNumber,
......@@ -1149,7 +1153,7 @@ async fn handle_approved_ancestor(
let target_number = {
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::BlockNumber(target, tx).into()).await;
ctx.send_message(ChainApiMessage::BlockNumber(target, tx)).await;
match rx.await {
Ok(Ok(Some(n))) => n,
......@@ -1173,7 +1177,7 @@ async fn handle_approved_ancestor(
hash: target,
k: (target_number - (lower_bound + 1)) as usize,
response_channel: tx,
}.into()).await;
}).await;
match rx.await {
Ok(Ok(a)) => a,
......@@ -1994,7 +1998,7 @@ fn process_wakeup(
// spawned. When the background work is no longer needed, the `AbortHandle` should be dropped
// to cancel the background work and any requests it has spawned.
async fn launch_approval(
ctx: &mut impl SubsystemContext,
ctx: &mut (impl SubsystemContext<Message = ApprovalVotingMessage> + overseer::SubsystemContext<Message = ApprovalVotingMessage>),
metrics: Metrics,
session_index: SessionIndex,
candidate: CandidateReceipt,
......@@ -2043,7 +2047,7 @@ async fn launch_approval(
session_index,
Some(backing_group),
a_tx,
).into()).await;
)).await;
ctx.send_message(
RuntimeApiMessage::Request(
......@@ -2052,7 +2056,7 @@ async fn launch_approval(
candidate.descriptor.validation_code_hash,
code_tx,
),
).into()
)
).await;
let candidate = candidate.clone();
......
......@@ -37,7 +37,9 @@ use polkadot_node_primitives::{
ErasureChunk, AvailableData,
};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem,
FromOverseer, OverseerSignal, SubsystemError,
SubsystemContext, SpawnedSubsystem,
overseer,
ActiveLeavesUpdate,
errors::{ChainApiError, RuntimeApiError},
};
......@@ -522,9 +524,10 @@ impl KnownUnfinalizedBlocks {
}
}
impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
impl<Context> overseer::Subsystem<Context, SubsystemError> for AvailabilityStoreSubsystem
where
Context: SubsystemContext<Message = AvailabilityStoreMessage>,
Context: overseer::SubsystemContext<Message = AvailabilityStoreMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run(self, ctx)
......@@ -540,7 +543,8 @@ where
async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context)
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
Context: SubsystemContext<Message = AvailabilityStoreMessage>,
Context: overseer::SubsystemContext<Message = AvailabilityStoreMessage>,
{
let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();
......@@ -570,7 +574,8 @@ async fn run_iteration<Context>(
)
-> Result<bool, Error>
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
Context: SubsystemContext<Message = AvailabilityStoreMessage>,
Context: overseer::SubsystemContext<Message = AvailabilityStoreMessage>,
{
select! {
incoming = ctx.recv().fuse() => {
......@@ -615,18 +620,22 @@ where
Ok(false)
}
async fn process_block_activated(
ctx: &mut impl SubsystemContext,
async fn process_block_activated<Context>(
ctx: &mut Context,
subsystem: &mut AvailabilityStoreSubsystem,
activated: Hash,
) -> Result<(), Error> {
) -> Result<(), Error>
where
Context: SubsystemContext<Message = AvailabilityStoreMessage>,
Context: overseer::SubsystemContext<Message = AvailabilityStoreMessage>,