Unverified Commit 798f781f authored by Peter Goodspeed-Niklaus's avatar Peter Goodspeed-Niklaus Committed by GitHub
Browse files

start working on building the real overseer (#1795)



* start working on building the real overseer

Unfortunately, this fails to compile right now due to an upstream
failure to compile which is probably brought on by a recent upgrade
to rustc v1.47.

* fill in AllSubsystems internal constructors

* replace fn make_metrics with Metrics::attempt_to_register

* update to account for #1740

* remove Metrics::register, rename Metrics::attempt_to_register

* add 'static bounds to real_overseer type params

* pass authority_discovery and network_service to real_overseer

It's not straightforwardly obvious that this is the best way to handle
the case when there is no authority discovery service, but it seems
to be the best option available at the moment.

* select a proper database configuration for the availability store db

* use subdirectory for av-store database path

* apply Basti's patch which avoids needing to parameterize everything on Block

* simplify path extraction

* get all tests to compile

* Fix Prometheus double-registry error

for debugging purposes, added this to node/subsystem-util/src/lib.rs:472-476:

```rust
Some(registry) => Self::try_register(registry).map_err(|err| {
	eprintln!("PrometheusError calling {}::register: {:?}", std::any::type_name::<Self>(), err);
	err
}),
```

That pointed out where the registration was failing, which led to
this fix. The test still doesn't pass, but it now fails in a new
and different way!

* authorities must have authority discovery, but not necessarily overseer handlers

* fix broken SpawnedSubsystem impls

detailed logging determined that using the `Box::new` style of
future generation, the `self.run` method was never being called,
leading to dropped receivers / closed senders for those subsystems,
causing the overseer to shut down immediately.

This is not the final fix needed to get things working properly,
but it's a good start.

* use prometheus properly

Prometheus lets us register simple counters, which aren't very
interesting. It also allows us to register CounterVecs, which are.
With a CounterVec, you can provide a set of labels, which can
later be used to filter the counts.

We were using them wrong, though. This pattern was repeated in a
variety of places in the code:

```rust
// panics with an cardinality mismatch
let my_counter = register(CounterVec::new(opts, &["succeeded", "failed"])?, registry)?;
my_counter.with_label_values(&["succeeded"]).inc()
```

The problem is that the labels provided in the constructor are not
the set of legal values which can be annotated, but a set of individual
label names which can have individual, arbitrary values.

This commit fixes that.

* get av-store subsystem to actually run properly and not die on first signal

* typo fix: incomming -> incoming

* don't disable authority discovery in test nodes

* Fix rococo-v1 missing session keys

* Update node/core/av-store/Cargo.toml

* try dummying out av-store on non-full-nodes

* overseer and subsystems are required only for full nodes

* Reduce the amount of warnings on browser target

* Fix two more warnings

* InclusionInherent should actually have an Inherent module on rococo

* Ancestry: don't return genesis' parent hash

* Update Cargo.lock

* fix broken test

* update test script: specify chainspec as script argument

* Apply suggestions from code review
Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Update node/service/src/lib.rs
Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* node/service/src/lib: Return error via ? operator

* post-merge blues

* add is_collator flag

* prevent occasional av-store test panic

* simplify fix; expand application

* run authority_discovery in Role::Discover when collating

* distinguish between proposer closed channel errors

* add IsCollator enum, remove is_collator CLI flag

* improve formatting

* remove nop loop

* Fix some stuff
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
Co-authored-by: Bastian Köcher's avatarBastian Köcher <git@kchr.de>
Co-authored-by: Fedor Sakharov's avatarFedor Sakharov <fedor.sakharov@gmail.com>
Co-authored-by: default avatarRobert Habermeier <robert@Roberts-MBP.lan1>
Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: default avatarMax Inden <mail@max-inden.de>
parent 80e3a7e0
Pipeline #112371 canceled with stages
in 14 minutes and 49 seconds
......@@ -4922,6 +4922,7 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-overseer",
"polkadot-primitives",
"sc-service",
"smallvec 1.4.2",
"sp-core",
"thiserror",
......@@ -5108,6 +5109,7 @@ dependencies = [
"sc-network",
"smallvec 1.4.2",
"sp-core",
"substrate-prometheus-endpoint",
"thiserror",
]
......@@ -5454,13 +5456,29 @@ dependencies = [
"pallet-transaction-payment-rpc-runtime-api",
"parity-scale-codec",
"parking_lot 0.9.0",
"polkadot-availability-bitfield-distribution",
"polkadot-availability-distribution",
"polkadot-collator-protocol",
"polkadot-network-bridge",
"polkadot-node-collation-generation",
"polkadot-node-core-av-store",
"polkadot-node-core-backing",
"polkadot-node-core-bitfield-signing",
"polkadot-node-core-candidate-selection",
"polkadot-node-core-candidate-validation",
"polkadot-node-core-chain-api",
"polkadot-node-core-proposer",
"polkadot-node-core-provisioner",
"polkadot-node-core-runtime-api",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-overseer",
"polkadot-parachain",
"polkadot-pov-distribution",
"polkadot-primitives",
"polkadot-rpc",
"polkadot-runtime",
"polkadot-statement-distribution",
"polkadot-test-client",
"rococo-v1-runtime",
"sc-authority-discovery",
......@@ -5472,7 +5490,6 @@ dependencies = [
"sc-consensus-babe",
"sc-executor",
"sc-finality-grandpa",
"sc-keystore",
"sc-network",
"sc-service",
"sc-telemetry",
......@@ -5489,6 +5506,7 @@ dependencies = [
"sp-finality-grandpa",
"sp-inherents",
"sp-io",
"sp-keystore",
"sp-offchain",
"sp-runtime",
"sp-session",
......
......@@ -20,7 +20,6 @@ use browser_utils::{
Client,
browser_configuration, set_console_error_panic_hook, init_console_log,
};
use std::str::FromStr;
/// Starts the client.
#[wasm_bindgen]
......
......@@ -148,6 +148,7 @@ pub fn run() -> Result<()> {
_ => service::build_full(
config,
authority_discovery_disabled,
service::IsCollator::No,
grandpa_pause,
).map(|full| full.task_manager),
}
......
......@@ -19,6 +19,8 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-overseer = { path = "../../overseer" }
polkadot-primitives = { path = "../../../primitives" }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
[dev-dependencies]
env_logger = "0.7.1"
assert_matches = "1.3.0"
......
......@@ -397,6 +397,23 @@ pub struct Config {
pub path: PathBuf,
}
impl std::convert::TryFrom<sc_service::config::DatabaseConfig> for Config {
type Error = &'static str;
fn try_from(config: sc_service::config::DatabaseConfig) -> Result<Self, Self::Error> {
let path = config.path().ok_or("custom databases are not supported")?;
Ok(Self {
// substrate cache size is improper here; just use the default
cache_size: None,
// DB path is a sub-directory of substrate db path to give two properties:
// 1: column numbers don't conflict with substrate
// 2: commands like purge-chain work without further changes
path: path.join("parachains").join("av-store"),
})
}
}
impl AvailabilityStoreSubsystem {
/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result<Self> {
......@@ -449,7 +466,6 @@ async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Contex
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
let ctx = &mut ctx;
loop {
// Every time the following two methods are called a read from DB is performed.
// But given that these are very small values which are essentially a newtype
......@@ -470,16 +486,19 @@ where
ActiveLeavesUpdate { activated, .. })
)) => {
for activated in activated.into_iter() {
process_block_activated(ctx, &subsystem.inner, activated).await?;
process_block_activated(&mut ctx, &subsystem.inner, activated).await?;
}
}
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(hash))) => {
process_block_finalized(&subsystem, ctx, &subsystem.inner, hash).await?;
process_block_finalized(&subsystem, &mut ctx, &subsystem.inner, hash).await?;
}
Ok(FromOverseer::Communication { msg }) => {
process_message(&mut subsystem, ctx, msg).await?;
process_message(&mut subsystem, &mut ctx, msg).await?;
}
Err(_) => break,
Err(e) => {
log::error!("AvailabilityStoreSubsystem err: {:#?}", e);
break
},
}
}
pov_pruning_time = pov_pruning_time => {
......@@ -945,15 +964,15 @@ fn query_inner<D: Decode>(db: &Arc<dyn KeyValueDB>, column: u32, key: &[u8]) ->
}
impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
where
Context: SubsystemContext<Message = AvailabilityStoreMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(async move {
if let Err(e) = run(self, ctx).await {
log::error!(target: LOG_TARGET, "Subsystem exited with an error {:?}", e);
}
});
let future = run(self, ctx)
.map(|r| if let Err(e) = r {
log::error!(target: "availabilitystore", "Subsystem exited with an error {:?}", e);
})
.boxed();
SpawnedSubsystem {
name: "availability-store-subsystem",
......
......@@ -420,10 +420,10 @@ impl metrics::Metrics for Metrics {
seconds: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"candidate_selection_invalid_selections_total",
"Number of Candidate Selection subsystem seconding selections which proved to be invalid.",
"candidate_selection_seconds_total",
"Number of Candidate Selection subsystem seconding events.",
),
&["succeeded", "failed"],
&["success"],
)?,
registry,
)?,
......@@ -433,7 +433,7 @@ impl metrics::Metrics for Metrics {
"candidate_selection_invalid_selections_total",
"Number of Candidate Selection subsystem seconding selections which proved to be invalid.",
),
&["succeeded", "failed"],
&["success"],
)?,
registry,
)?,
......
......@@ -82,7 +82,7 @@ impl Metrics {
metrics.validation_requests.with_label_values(&["invalid"]).inc();
},
Err(_) => {
metrics.validation_requests.with_label_values(&["failed"]).inc();
metrics.validation_requests.with_label_values(&["validation failure"]).inc();
},
}
}
......@@ -98,7 +98,7 @@ impl metrics::Metrics for Metrics {
"parachain_validation_requests_total",
"Number of validation requests served.",
),
&["valid", "invalid", "failed"],
&["validity"],
)?,
registry,
)?,
......
......@@ -40,18 +40,19 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::v1::{Block, BlockId};
use sp_blockchain::HeaderBackend;
use std::sync::Arc;
use futures::prelude::*;
/// The Chain API Subsystem implementation.
pub struct ChainApiSubsystem<Client> {
client: Client,
client: Arc<Client>,
metrics: Metrics,
}
impl<Client> ChainApiSubsystem<Client> {
/// Create a new Chain API subsystem with the given client.
pub fn new(client: Client, metrics: Metrics) -> Self {
pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
ChainApiSubsystem {
client,
metrics,
......@@ -126,8 +127,13 @@ where
// fewer than `k` ancestors are available
Ok(None) => None,
Ok(Some(header)) => {
hash = header.parent_hash;
Some(Ok(hash))
// stop at the genesis header.
if header.number == 1 {
None
} else {
hash = header.parent_hash;
Some(Ok(hash))
}
}
}
});
......@@ -171,7 +177,7 @@ impl metrics::Metrics for Metrics {
"parachain_chain_api_requests_total",
"Number of Chain API requests served.",
),
&["succeeded", "failed"],
&["success"],
)?,
registry,
)?,
......@@ -300,11 +306,11 @@ mod tests {
}
fn test_harness(
test: impl FnOnce(TestClient, TestSubsystemContextHandle<ChainApiMessage>)
test: impl FnOnce(Arc<TestClient>, TestSubsystemContextHandle<ChainApiMessage>)
-> BoxFuture<'static, ()>,
) {
let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new());
let client = TestClient::default();
let client = Arc::new(TestClient::default());
let subsystem = ChainApiSubsystem::new(client.clone(), Metrics(None));
let chain_api_task = run(ctx, subsystem).map(|x| x.unwrap());
......
......@@ -145,7 +145,7 @@ where
let (sender, receiver) = futures::channel::oneshot::channel();
overseer.wait_for_activation(parent_header_hash, sender).await?;
receiver.await.map_err(Error::ClosedChannelFromProvisioner)??;
receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)??;
let (sender, receiver) = futures::channel::oneshot::channel();
// strictly speaking, we don't _have_ to .await this send_msg before opening the
......@@ -156,7 +156,7 @@ where
ProvisionerMessage::RequestInherentData(parent_header_hash, sender),
)).await?;
receiver.await.map_err(Error::ClosedChannelFromProvisioner)
receiver.await.map_err(|_| Error::ClosedChannelAwaitingInherentData)
}
.boxed()
.fuse();
......@@ -236,7 +236,8 @@ pub enum Error {
Blockchain(sp_blockchain::Error),
Inherent(sp_inherents::Error),
Timeout,
ClosedChannelFromProvisioner(futures::channel::oneshot::Canceled),
ClosedChannelAwaitingActivation,
ClosedChannelAwaitingInherentData,
Subsystem(SubsystemError)
}
......@@ -271,7 +272,8 @@ impl fmt::Display for Error {
Self::Blockchain(err) => write!(f, "blockchain error: {}", err),
Self::Inherent(err) => write!(f, "inherent error: {:?}", err),
Self::Timeout => write!(f, "timeout: provisioner did not return inherent data after {:?}", PROPOSE_TIMEOUT),
Self::ClosedChannelFromProvisioner(err) => write!(f, "provisioner closed inherent data channel before sending: {}", err),
Self::ClosedChannelAwaitingActivation => write!(f, "closed channel from overseer when awaiting activation"),
Self::ClosedChannelAwaitingInherentData => write!(f, "closed channel from provisioner when awaiting inherent data"),
Self::Subsystem(err) => write!(f, "subsystem error: {:?}", err),
}
}
......@@ -282,7 +284,6 @@ impl std::error::Error for Error {
match self {
Self::Consensus(err) => Some(err),
Self::Blockchain(err) => Some(err),
Self::ClosedChannelFromProvisioner(err) => Some(err),
Self::Subsystem(err) => Some(err),
_ => None
}
......
......@@ -482,7 +482,7 @@ impl Metrics {
fn on_inherent_data_request(&self, response: Result<(), ()>) {
if let Some(metrics) = &self.0 {
match response {
Ok(()) => metrics.inherent_data_requests.with_label_values(&["succeded"]).inc(),
Ok(()) => metrics.inherent_data_requests.with_label_values(&["succeeded"]).inc(),
Err(()) => metrics.inherent_data_requests.with_label_values(&["failed"]).inc(),
}
}
......@@ -498,7 +498,7 @@ impl metrics::Metrics for Metrics {
"parachain_inherent_data_requests_total",
"Number of InherentData requests served by provisioner.",
),
&["succeeded", "failed"],
&["success"],
)?,
registry,
)?,
......
......@@ -34,6 +34,7 @@ use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};
use std::sync::Arc;
use sp_api::{ProvideRuntimeApi};
......@@ -41,19 +42,19 @@ use futures::prelude::*;
/// The `RuntimeApiSubsystem`. See module docs for more details.
pub struct RuntimeApiSubsystem<Client> {
client: Client,
client: Arc<Client>,
metrics: Metrics,
}
impl<Client> RuntimeApiSubsystem<Client> {
/// Create a new Runtime API subsystem wrapping the given client and metrics.
pub fn new(client: Client, metrics: Metrics) -> Self {
pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
RuntimeApiSubsystem { client, metrics }
}
}
impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
Client: ProvideRuntimeApi<Block> + Send + 'static,
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
Client::Api: ParachainHost<Block>,
Context: SubsystemContext<Message = RuntimeApiMessage>
{
......@@ -79,7 +80,7 @@ async fn run<Client>(
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
FromOverseer::Communication { msg } => match msg {
RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request(
&subsystem.client,
&*subsystem.client,
&subsystem.metrics,
relay_parent,
request,
......@@ -159,7 +160,7 @@ impl metrics::Metrics for Metrics {
"parachain_runtime_api_requests_total",
"Number of Runtime API requests served.",
),
&["succeeded", "failed"],
&["success"],
)?,
registry,
)?,
......@@ -288,7 +289,7 @@ mod tests {
#[test]
fn requests_validators() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = MockRuntimeApi::default();
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
......@@ -311,7 +312,7 @@ mod tests {
#[test]
fn requests_validator_groups() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = MockRuntimeApi::default();
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
......@@ -334,7 +335,7 @@ mod tests {
#[test]
fn requests_availability_cores() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = MockRuntimeApi::default();
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
......@@ -357,12 +358,12 @@ mod tests {
#[test]
fn requests_persisted_validation_data() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
runtime_api.validation_data.insert(para_a, Default::default());
Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
......@@ -397,12 +398,12 @@ mod tests {
#[test]
fn requests_full_validation_data() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
runtime_api.validation_data.insert(para_a, Default::default());
Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
......@@ -446,6 +447,8 @@ mod tests {
runtime_api.validation_outputs_results.insert(para_a, false);
runtime_api.validation_outputs_results.insert(para_b, true);
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
......@@ -491,7 +494,7 @@ mod tests {
#[test]
fn requests_session_index_for_child() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = MockRuntimeApi::default();
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
......@@ -514,12 +517,12 @@ mod tests {
#[test]
fn requests_validation_code() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
runtime_api.validation_code.insert(para_a, Default::default());
Arc::get_mut(&mut runtime_api).unwrap().validation_code.insert(para_a, Default::default());
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
......@@ -561,6 +564,8 @@ mod tests {
runtime_api.candidate_pending_availability.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
......@@ -595,7 +600,7 @@ mod tests {
#[test]
fn requests_candidate_events() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = MockRuntimeApi::default();
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
......
......@@ -796,7 +796,7 @@ impl AvailabilityDistributionSubsystem {
{
warn!(
target: TARGET,
"Failed to handle incomming network messages: {:?}", e
"Failed to handle incoming network messages: {:?}", e
);
}
}
......@@ -1023,7 +1023,7 @@ where
}
)).await
.map_err(|e| Error::StoreChunkSendQuery(e))?;
rx.await.map_err(|e| Error::StoreChunkResponseChannel(e))
}
......
......@@ -169,7 +169,7 @@ impl BitfieldDistribution {
trace!(target: TARGET, "Processing NetworkMessage");
// a network message was received
if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await {
warn!(target: TARGET, "Failed to handle incomming network messages: {:?}", e);
warn!(target: TARGET, "Failed to handle incoming network messages: {:?}", e);
}
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => {
......
......@@ -45,7 +45,7 @@ use polkadot_node_subsystem_util::{
};
#[derive(Clone, Default)]
pub(super) struct Metrics(Option<MetricsInner>);
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_advertisment_made(&self) {
......
......@@ -37,7 +37,7 @@ use polkadot_node_network_protocol::{
use polkadot_primitives::v1::CollatorId;
use polkadot_node_subsystem_util::{
self as util,
metrics::{self, prometheus},
metrics::prometheus,
};
mod collator_side;
......@@ -72,28 +72,25 @@ impl From<util::validator_discovery::Error> for Error {
type Result<T> = std::result::Result<T, Error>;
enum ProtocolSide {
/// What side of the collator protocol is being engaged
pub enum ProtocolSide {
/// Validators operate on the relay chain.
Validator(validator_side::Metrics),
/// Collators operate on a parachain.
Collator(CollatorId, collator_side::Metrics),
}
/// The collator protocol subsystem.
pub struct CollatorProtocolSubsystem {
protocol_side: ProtocolSide,
protocol_side: ProtocolSide,
}
impl CollatorProtocolSubsystem {
/// Start the collator protocol.
/// If `id` is `Some` this is a collator side of the protocol.
/// If `id` is `None` this is a validator side of the protocol.
/// If `id` is `None` this is a validator side of the protocol.
/// Caller must provide a registry for prometheus metrics.
pub fn new(id: Option<CollatorId>, registry: Option<&prometheus::Registry>) -> Self {
use metrics::Metrics;
let protocol_side = match id {
Some(id) => ProtocolSide::Collator(id, collator_side::Metrics::register(registry)),
None => ProtocolSide::Validator(validator_side::Metrics::register(registry)),
};
pub fn new(protocol_side: ProtocolSide) -> Self {
Self {
protocol_side,
}
......@@ -127,7 +124,7 @@ where
fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "collator-protocol-subsystem",
future: Box::pin(async move { self.run(ctx) }.map(|_| ())),
future: self.run(ctx).map(|_| ()).boxed(),
}
}
}
......
......@@ -52,7 +52,7 @@ const COST_REPORT_BAD: Rep = Rep::new(-50, "A collator was reported by another s
const BENEFIT_NOTIFY_GOOD: Rep = Rep::new(50, "A collator was noted good by another subsystem");
#[derive(Clone, Default)]
pub(super) struct Metrics(Option<MetricsInner>);
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_request(&self, succeeded: std::result::Result<(), ()>) {
......@@ -81,7 +81,7 @@ impl metrics::Metrics for Metrics {
"parachain_collation_requests_total",
"Number of collations requested from Collators.",
),
&["succeeded", "failed"],
&["success"],
)?,
registry,
)?
......
......@@ -86,6 +86,15 @@ impl<C> Subsystem<C> for StatementDistribution
}
}