diff --git a/Cargo.lock b/Cargo.lock index 2a8f570fabe475e76e5564bb276247fdf579e5bb..43d7e66b8d21da01f78cdf5637f29c1b8561e404 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4665,6 +4665,7 @@ dependencies = [ "pin-project", "polkadot-overseer", "portpicker", + "prometheus", "rand", "sc-client-api", "sc-rpc-api", @@ -4682,6 +4683,7 @@ dependencies = [ "sp-state-machine 0.35.0", "sp-storage 19.0.0", "sp-version 29.0.0", + "substrate-prometheus-endpoint", "thiserror", "tokio", "tokio-util", @@ -4819,6 +4821,7 @@ dependencies = [ "polkadot-service", "polkadot-test-service", "portpicker", + "prometheus", "rand", "sc-basic-authorship", "sc-block-builder", diff --git a/cumulus/client/relay-chain-minimal-node/src/lib.rs b/cumulus/client/relay-chain-minimal-node/src/lib.rs index cea7e6e4a0355d389398e20c54b5d4960d24078a..a3d858ea40c92fa6ffebdb54cd52b05905380721 100644 --- a/cumulus/client/relay-chain-minimal-node/src/lib.rs +++ b/cumulus/client/relay-chain-minimal-node/src/lib.rs @@ -96,19 +96,20 @@ async fn build_interface( client: RelayChainRpcClient, ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option<CollatorPair>)> { let collator_pair = CollatorPair::generate().0; + let blockchain_rpc_client = Arc::new(BlockChainRpcClient::new(client.clone())); let collator_node = match polkadot_config.network.network_backend { sc_network::config::NetworkBackendType::Libp2p => new_minimal_relay_chain::<RelayBlock, sc_network::NetworkWorker<RelayBlock, RelayHash>>( polkadot_config, collator_pair.clone(), - Arc::new(BlockChainRpcClient::new(client.clone())), + blockchain_rpc_client, ) .await?, sc_network::config::NetworkBackendType::Litep2p => new_minimal_relay_chain::<RelayBlock, sc_network::Litep2pNetworkBackend>( polkadot_config, collator_pair.clone(), - Arc::new(BlockChainRpcClient::new(client.clone())), + blockchain_rpc_client, ) .await?, }; @@ -120,17 +121,19 @@ async fn build_interface( } pub async fn build_minimal_relay_chain_node_with_rpc( - polkadot_config: Configuration, + relay_chain_config: Configuration, + parachain_prometheus_registry: Option<&Registry>, task_manager: &mut TaskManager, relay_chain_url: Vec<Url>, ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option<CollatorPair>)> { let client = cumulus_relay_chain_rpc_interface::create_client_and_start_worker( relay_chain_url, task_manager, + parachain_prometheus_registry, ) .await?; - build_interface(polkadot_config, task_manager, client).await + build_interface(relay_chain_config, task_manager, client).await } pub async fn build_minimal_relay_chain_node_light_client( diff --git a/cumulus/client/relay-chain-rpc-interface/Cargo.toml b/cumulus/client/relay-chain-rpc-interface/Cargo.toml index c2deddc5341de0d235e9949140762db39600226d..fb4cb4ceed4ec9a6ec796fa7b00c4df353e90ad6 100644 --- a/cumulus/client/relay-chain-rpc-interface/Cargo.toml +++ b/cumulus/client/relay-chain-rpc-interface/Cargo.toml @@ -29,6 +29,7 @@ sp-version = { workspace = true, default-features = true } sc-client-api = { workspace = true, default-features = true } sc-rpc-api = { workspace = true, default-features = true } sc-service = { workspace = true, default-features = true } +prometheus-endpoint = { workspace = true, default-features = true } tokio = { features = ["sync"], workspace = true, default-features = true } tokio-util = { features = ["compat"], workspace = true } @@ -49,3 +50,4 @@ either = { workspace = true, default-features = true } thiserror = { workspace = true } rand = { workspace = true, default-features = true } pin-project = { workspace = true } +prometheus = { workspace = true } diff --git a/cumulus/client/relay-chain-rpc-interface/src/lib.rs b/cumulus/client/relay-chain-rpc-interface/src/lib.rs index e32ec6a41a4bf00db5e5db07235bf43bbad1dbf6..3698938bfd8f6634f7aa26a5f61d340c21c49e0c 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/lib.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/lib.rs @@ -39,6 +39,7 @@ use cumulus_primitives_core::relay_chain::BlockId; pub use url::Url; mod light_client_worker; +mod metrics; mod reconnecting_ws_client; mod rpc_client; mod tokio_platform; @@ -87,12 +88,13 @@ impl RelayChainInterface for RelayChainRpcInterface { async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> { let hash = match block_id { BlockId::Hash(hash) => hash, - BlockId::Number(num) => + BlockId::Number(num) => { if let Some(hash) = self.rpc_client.chain_get_block_hash(Some(num)).await? { hash } else { return Ok(None) - }, + } + }, }; let header = self.rpc_client.chain_get_header(Some(hash)).await?; diff --git a/cumulus/client/relay-chain-rpc-interface/src/metrics.rs b/cumulus/client/relay-chain-rpc-interface/src/metrics.rs new file mode 100644 index 0000000000000000000000000000000000000000..4d09464d237c69faffa7f168dd9647c82adba285 --- /dev/null +++ b/cumulus/client/relay-chain-rpc-interface/src/metrics.rs @@ -0,0 +1,49 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see <http://www.gnu.org/licenses/>. + +use prometheus::{Error as PrometheusError, HistogramTimer, Registry}; +use prometheus_endpoint::{HistogramOpts, HistogramVec, Opts}; + +/// Gathers metrics about the blockchain RPC client. +#[derive(Clone)] +pub(crate) struct RelaychainRpcMetrics { + rpc_request: HistogramVec, +} + +impl RelaychainRpcMetrics { + pub(crate) fn register(registry: &Registry) -> Result<Self, PrometheusError> { + Ok(Self { + rpc_request: prometheus_endpoint::register( + HistogramVec::new( + HistogramOpts { + common_opts: Opts::new( + "relay_chain_rpc_interface", + "Tracks stats about cumulus relay chain RPC interface", + ), + buckets: prometheus::exponential_buckets(0.001, 4.0, 9) + .expect("function parameters are constant and always valid; qed"), + }, + &["method"], + )?, + registry, + )?, + }) + } + + pub(crate) fn start_request_timer(&self, method: &str) -> HistogramTimer { + self.rpc_request.with_label_values(&[method]).start_timer() + } +} diff --git a/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs b/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs index c7eaa45958b0b002a00ec8210d5044bd56aa0569..6e282281de69aafdb521c4217ec18e2a5a9c1756 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -22,6 +22,7 @@ use jsonrpsee::{ core::{params::ArrayParams, ClientError as JsonRpseeError}, rpc_params, }; +use prometheus::Registry; use serde::de::DeserializeOwned; use serde_json::Value as JsonValue; use std::collections::{btree_map::BTreeMap, VecDeque}; @@ -52,6 +53,7 @@ use sp_version::RuntimeVersion; use crate::{ light_client_worker::{build_smoldot_client, LightClientRpcWorker}, + metrics::RelaychainRpcMetrics, reconnecting_ws_client::ReconnectingWebsocketWorker, }; pub use url::Url; @@ -87,6 +89,7 @@ pub enum RpcDispatcherMessage { pub async fn create_client_and_start_worker( urls: Vec<Url>, task_manager: &mut TaskManager, + prometheus_registry: Option<&Registry>, ) -> RelayChainResult<RelayChainRpcClient> { let (worker, sender) = ReconnectingWebsocketWorker::new(urls).await; @@ -94,7 +97,7 @@ pub async fn create_client_and_start_worker( .spawn_essential_handle() .spawn("relay-chain-rpc-worker", None, worker.run()); - let client = RelayChainRpcClient::new(sender); + let client = RelayChainRpcClient::new(sender, prometheus_registry); Ok(client) } @@ -113,7 +116,8 @@ pub async fn create_client_and_start_light_client_worker( .spawn_essential_handle() .spawn("relay-light-client-worker", None, worker.run()); - let client = RelayChainRpcClient::new(sender); + // We'll not setup prometheus exporter metrics for the light client worker. + let client = RelayChainRpcClient::new(sender, None); Ok(client) } @@ -123,6 +127,7 @@ pub async fn create_client_and_start_light_client_worker( pub struct RelayChainRpcClient { /// Sender to send messages to the worker. worker_channel: TokioSender<RpcDispatcherMessage>, + metrics: Option<RelaychainRpcMetrics>, } impl RelayChainRpcClient { @@ -130,8 +135,17 @@ impl RelayChainRpcClient { /// /// This client expects a channel connected to a worker that processes /// requests sent via this channel. - pub(crate) fn new(worker_channel: TokioSender<RpcDispatcherMessage>) -> Self { - RelayChainRpcClient { worker_channel } + pub(crate) fn new( + worker_channel: TokioSender<RpcDispatcherMessage>, + prometheus_registry: Option<&Registry>, + ) -> Self { + RelayChainRpcClient { + worker_channel, + metrics: prometheus_registry + .and_then(|inner| RelaychainRpcMetrics::register(inner).map_err(|err| { + tracing::warn!(target: LOG_TARGET, error = %err, "Unable to instantiate the RPC client metrics, continuing w/o metrics setup."); + }).ok()), + } } /// Call a call to `state_call` rpc method. @@ -148,6 +162,7 @@ impl RelayChainRpcClient { payload_bytes, hash }; + let res = self .request_tracing::<sp_core::Bytes, _>("state_call", params, |err| { tracing::trace!( @@ -190,6 +205,8 @@ impl RelayChainRpcClient { R: DeserializeOwned + std::fmt::Debug, OR: Fn(&RelayChainError), { + let _timer = self.metrics.as_ref().map(|inner| inner.start_request_timer(method)); + let (tx, rx) = futures::channel::oneshot::channel(); let message = RpcDispatcherMessage::Request(method.into(), params, tx); diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index c95c72c370a1c093d1401a0b5d41c0e8893f81e0..92dc64371f34a9f147e7c5c5c2d48ff3ff4a547c 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -373,6 +373,7 @@ pub async fn build_relay_chain_interface( cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) => build_minimal_relay_chain_node_with_rpc( relay_chain_config, + parachain_config.prometheus_registry(), task_manager, rpc_target_urls, ) diff --git a/cumulus/test/service/Cargo.toml b/cumulus/test/service/Cargo.toml index f766d123632096a4af578fcf87bdb0b299008028..a1b70c5239525a641da43fdf328897a727d26d1f 100644 --- a/cumulus/test/service/Cargo.toml +++ b/cumulus/test/service/Cargo.toml @@ -18,6 +18,7 @@ clap = { features = ["derive"], workspace = true } codec = { workspace = true, default-features = true } criterion = { features = ["async_tokio"], workspace = true, default-features = true } jsonrpsee = { features = ["server"], workspace = true } +prometheus = { workspace = true } rand = { workspace = true, default-features = true } serde = { features = ["derive"], workspace = true, default-features = true } serde_json = { workspace = true, default-features = true } diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index a600dcce3d66e789cdcd8164fa9c51aa3d69d306..db771f5fe533fe8b2b9b3e5db1a0872e0a822930 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -32,6 +32,7 @@ use cumulus_client_consensus_aura::{ ImportQueueParams, }; use cumulus_client_consensus_proposer::Proposer; +use prometheus::Registry; use runtime::AccountId; use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY}; use sp_consensus_aura::sr25519::AuthorityPair; @@ -264,11 +265,12 @@ pub fn new_partial( async fn build_relay_chain_interface( relay_chain_config: Configuration, + parachain_prometheus_registry: Option<&Registry>, collator_key: Option<CollatorPair>, collator_options: CollatorOptions, task_manager: &mut TaskManager, ) -> RelayChainResult<Arc<dyn RelayChainInterface + 'static>> { - let relay_chain_full_node = match collator_options.relay_chain_mode { + let relay_chain_node = match collator_options.relay_chain_mode { cumulus_client_cli::RelayChainMode::Embedded => polkadot_test_service::new_full( relay_chain_config, if let Some(ref key) = collator_key { @@ -283,6 +285,7 @@ async fn build_relay_chain_interface( cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) => return build_minimal_relay_chain_node_with_rpc( relay_chain_config, + parachain_prometheus_registry, task_manager, rpc_target_urls, ) @@ -294,13 +297,13 @@ async fn build_relay_chain_interface( .map(|r| r.0), }; - task_manager.add_child(relay_chain_full_node.task_manager); + task_manager.add_child(relay_chain_node.task_manager); tracing::info!("Using inprocess node."); Ok(Arc::new(RelayChainInProcessInterface::new( - relay_chain_full_node.client.clone(), - relay_chain_full_node.backend.clone(), - relay_chain_full_node.sync_service.clone(), - relay_chain_full_node.overseer_handle.ok_or(RelayChainError::GenericError( + relay_chain_node.client.clone(), + relay_chain_node.backend.clone(), + relay_chain_node.sync_service.clone(), + relay_chain_node.overseer_handle.ok_or(RelayChainError::GenericError( "Overseer should be running in full node.".to_string(), ))?, ))) @@ -344,9 +347,9 @@ where let backend = params.backend.clone(); let block_import = params.other; - let relay_chain_interface = build_relay_chain_interface( relay_chain_config, + parachain_config.prometheus_registry(), collator_key.clone(), collator_options.clone(), &mut task_manager, @@ -494,7 +497,7 @@ where slot_drift: Duration::from_secs(1), }; - let (collation_future, block_builer_future) = + let (collation_future, block_builder_future) = slot_based::run::<Block, AuthorityPair, _, _, _, _, _, _, _, _>(params); task_manager.spawn_essential_handle().spawn( "collation-task", @@ -504,7 +507,7 @@ where task_manager.spawn_essential_handle().spawn( "block-builder-task", None, - block_builer_future, + block_builder_future, ); } else { tracing::info!(target: LOG_TARGET, "Starting block authoring with lookahead collator."); diff --git a/cumulus/test/service/src/main.rs b/cumulus/test/service/src/main.rs index 9357978b769a4e8ec6c4f850d37290e3dee1b1e5..caa672e611f7cd47d42955fc1a8a33b4643f3097 100644 --- a/cumulus/test/service/src/main.rs +++ b/cumulus/test/service/src/main.rs @@ -61,36 +61,39 @@ fn main() -> Result<(), sc_cli::Error> { let collator_options = cli.run.collator_options(); let tokio_runtime = sc_cli::build_runtime()?; let tokio_handle = tokio_runtime.handle(); - let config = cli + let parachain_config = cli .run .normalize() .create_configuration(&cli, tokio_handle.clone()) .expect("Should be able to generate config"); - let polkadot_cli = RelayChainCli::new( - &config, + let relay_chain_cli = RelayChainCli::new( + ¶chain_config, [RelayChainCli::executable_name()].iter().chain(cli.relaychain_args.iter()), ); - - let tokio_handle = config.tokio_handle.clone(); - let polkadot_config = - SubstrateCli::create_configuration(&polkadot_cli, &polkadot_cli, tokio_handle) - .map_err(|err| format!("Relay chain argument error: {}", err))?; - - let parachain_id = chain_spec::Extensions::try_get(&*config.chain_spec) + let tokio_handle = parachain_config.tokio_handle.clone(); + let relay_chain_config = SubstrateCli::create_configuration( + &relay_chain_cli, + &relay_chain_cli, + tokio_handle, + ) + .map_err(|err| format!("Relay chain argument error: {}", err))?; + + let parachain_id = chain_spec::Extensions::try_get(&*parachain_config.chain_spec) .map(|e| e.para_id) .ok_or("Could not find parachain extension in chain-spec.")?; tracing::info!("Parachain id: {:?}", parachain_id); tracing::info!( "Is collating: {}", - if config.role.is_authority() { "yes" } else { "no" } + if parachain_config.role.is_authority() { "yes" } else { "no" } ); if cli.fail_pov_recovery { tracing::info!("PoV recovery failure enabled"); } - let collator_key = config.role.is_authority().then(|| CollatorPair::generate().0); + let collator_key = + parachain_config.role.is_authority().then(|| CollatorPair::generate().0); let consensus = cli .use_null_consensus @@ -102,15 +105,15 @@ fn main() -> Result<(), sc_cli::Error> { let (mut task_manager, _, _, _, _, _) = tokio_runtime .block_on(async move { - match polkadot_config.network.network_backend { + match relay_chain_config.network.network_backend { sc_network::config::NetworkBackendType::Libp2p => cumulus_test_service::start_node_impl::< _, sc_network::NetworkWorker<_, _>, >( - config, + parachain_config, collator_key, - polkadot_config, + relay_chain_config, parachain_id.into(), cli.disable_block_announcements.then(wrap_announce_block), cli.fail_pov_recovery, @@ -126,9 +129,9 @@ fn main() -> Result<(), sc_cli::Error> { _, sc_network::Litep2pNetworkBackend, >( - config, + parachain_config, collator_key, - polkadot_config, + relay_chain_config, parachain_id.into(), cli.disable_block_announcements.then(wrap_announce_block), cli.fail_pov_recovery, diff --git a/prdoc/pr_5572.prdoc b/prdoc/pr_5572.prdoc new file mode 100644 index 0000000000000000000000000000000000000000..c0707e4b7eba4cd18716d96be96fe6f4192070b8 --- /dev/null +++ b/prdoc/pr_5572.prdoc @@ -0,0 +1,21 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: added RPC metrics for the collator + +doc: + - audience: [ Node Dev, Node Operator ] + description: | + The metric is named `relay_chain_rpc_interface` and can be scraped by prometheus agents from the parachain prometheus exporter. The metric provide information about `count`, `sum` and `duration` in seconds (with exponential buckets with parameters as start = 0.001, factor = 4, count = 9) for all RPC requests made with the `relay-chain-rpc-interface`. +crates: + - name: cumulus-relay-chain-rpc-interface + bump: major + - name: cumulus-relay-chain-minimal-node + bump: major + - name: cumulus-test-service + bump: patch + - name: substrate-prometheus-endpoint + bump: patch + - name: cumulus-client-service + bump: patch + diff --git a/substrate/utils/prometheus/src/lib.rs b/substrate/utils/prometheus/src/lib.rs index 7a8c65590605265aebc91f02b9f17880d3ab926f..460640bcd8e410452106de2655cf1a4f357689dc 100644 --- a/substrate/utils/prometheus/src/lib.rs +++ b/substrate/utils/prometheus/src/lib.rs @@ -86,9 +86,10 @@ async fn request_metrics( /// Initializes the metrics context, and starts an HTTP server /// to serve metrics. pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> Result<(), Error> { - let listener = tokio::net::TcpListener::bind(&prometheus_addr) - .await - .map_err(|_| Error::PortInUse(prometheus_addr))?; + let listener = tokio::net::TcpListener::bind(&prometheus_addr).await.map_err(|e| { + log::error!(target: "prometheus", "Error binding to '{:#?}': {:#?}", prometheus_addr, e); + Error::PortInUse(prometheus_addr) + })?; init_prometheus_with_listener(listener, registry).await }