diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 5d7adfdcc3f52c331b6a930cbb89503619b1ccad..c19a230769c961861ccffa9553e0f5fc5be19892 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -38,6 +38,7 @@ use sp_arithmetic::traits::SaturatedConversion; use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId}; use message::generic::Message as GenericMessage; use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData}; +use prometheus_endpoint::{Registry, Gauge, register, PrometheusError, U64}; use sync::{ChainSync, SyncState}; use crate::service::{TransactionPool, ExHashT}; use crate::config::{BoxFinalityProofRequestBuilder, Roles}; @@ -135,6 +136,105 @@ mod rep { pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response"); } +struct Metrics { + handshaking_peers: Gauge<U64>, + obsolete_requests: Gauge<U64>, + peers: Gauge<U64>, + queued_blocks: Gauge<U64>, + fork_targets: Gauge<U64>, + finality_proofs_pending: Gauge<U64>, + finality_proofs_active: Gauge<U64>, + finality_proofs_failed: Gauge<U64>, + finality_proofs_importing: Gauge<U64>, + justifications_pending: Gauge<U64>, + justifications_active: Gauge<U64>, + justifications_failed: Gauge<U64>, + justifications_importing: Gauge<U64> +} + +impl Metrics { + fn register(r: &Registry) -> Result<Self, PrometheusError> { + Ok(Metrics { + handshaking_peers: { + let g = Gauge::new("sync_handshaking_peers", "number of newly connected peers")?; + register(g, r)? + }, + obsolete_requests: { + let g = Gauge::new("sync_obsolete_requests", "total number of obsolete requests")?; + register(g, r)? + }, + peers: { + let g = Gauge::new("sync_peers", "number of peers we sync with")?; + register(g, r)? + }, + queued_blocks: { + let g = Gauge::new("sync_queued_blocks", "number of blocks in import queue")?; + register(g, r)? + }, + fork_targets: { + let g = Gauge::new("sync_fork_targets", "fork sync targets")?; + register(g, r)? + }, + justifications_pending: { + let g = Gauge::new( + "sync_extra_justifications_pending", + "number of pending extra justifications requests" + )?; + register(g, r)? + }, + justifications_active: { + let g = Gauge::new( + "sync_extra_justifications_active", + "number of active extra justifications requests" + )?; + register(g, r)? + }, + justifications_failed: { + let g = Gauge::new( + "sync_extra_justifications_failed", + "number of failed extra justifications requests" + )?; + register(g, r)? + }, + justifications_importing: { + let g = Gauge::new( + "sync_extra_justifications_importing", + "number of importing extra justifications requests" + )?; + register(g, r)? + }, + finality_proofs_pending: { + let g = Gauge::new( + "sync_extra_finality_proofs_pending", + "number of pending extra finality proof requests" + )?; + register(g, r)? + }, + finality_proofs_active: { + let g = Gauge::new( + "sync_extra_finality_proofs_active", + "number of active extra finality proof requests" + )?; + register(g, r)? + }, + finality_proofs_failed: { + let g = Gauge::new( + "sync_extra_finality_proofs_failed", + "number of failed extra finality proof requests" + )?; + register(g, r)? + }, + finality_proofs_importing: { + let g = Gauge::new( + "sync_extra_finality_proofs_importing", + "number of importing extra finality proof requests" + )?; + register(g, r)? + }, + }) + } +} + // Lock must always be taken in order declared here. pub struct Protocol<B: BlockT, H: ExHashT> { /// Interval at which we call `tick`. @@ -163,6 +263,8 @@ pub struct Protocol<B: BlockT, H: ExHashT> { protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>, /// For each protocol name, the legacy gossiping engine ID. protocol_engine_by_name: HashMap<Cow<'static, [u8]>, ConsensusEngineId>, + /// Prometheus metrics. + metrics: Option<Metrics>, } #[derive(Default)] @@ -371,7 +473,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> { finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>, protocol_id: ProtocolId, peerset_config: sc_peerset::PeersetConfig, - block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send> + block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>, + metrics_registry: Option<&Registry> ) -> error::Result<(Protocol<B, H>, sc_peerset::PeersetHandle)> { let info = chain.info(); let sync = ChainSync::new( @@ -416,6 +519,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> { behaviour, protocol_name_by_engine: HashMap::new(), protocol_engine_by_name: HashMap::new(), + metrics: if let Some(r) = metrics_registry { + Some(Metrics::register(r)?) + } else { + None + } }; Ok((protocol, peerset_handle)) @@ -859,6 +967,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }); + self.report_metrics() } fn maintain_peers(&mut self) { @@ -1767,6 +1876,40 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> { } out } + + fn report_metrics(&self) { + use std::convert::TryInto; + + if let Some(metrics) = &self.metrics { + let mut obsolete_requests: u64 = 0; + for peer in self.context_data.peers.values() { + let n = peer.obsolete_requests.len().try_into().unwrap_or(std::u64::MAX); + obsolete_requests = obsolete_requests.saturating_add(n); + } + metrics.obsolete_requests.set(obsolete_requests); + + let n = self.handshaking_peers.len().try_into().unwrap_or(std::u64::MAX); + metrics.handshaking_peers.set(n); + + let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX); + metrics.peers.set(n); + + let m = self.sync.metrics(); + + metrics.fork_targets.set(m.fork_targets.into()); + metrics.queued_blocks.set(m.queued_blocks.into()); + + metrics.justifications_pending.set(m.justifications.pending_requests.into()); + metrics.justifications_active.set(m.justifications.active_requests.into()); + metrics.justifications_failed.set(m.justifications.failed_requests.into()); + metrics.justifications_importing.set(m.justifications.importing_requests.into()); + + metrics.finality_proofs_pending.set(m.finality_proofs.pending_requests.into()); + metrics.finality_proofs_active.set(m.finality_proofs.active_requests.into()); + metrics.finality_proofs_failed.set(m.finality_proofs.failed_requests.into()); + metrics.finality_proofs_importing.set(m.finality_proofs.importing_requests.into()); + } + } } /// Outcome of an incoming custom message. diff --git a/substrate/client/network/src/protocol/sync.rs b/substrate/client/network/src/protocol/sync.rs index d0427e61a818c6ad2596c1bdd6de5ef9a8ba3b8b..04afc5d918471514ac2ece465f305531273f6452 100644 --- a/substrate/client/network/src/protocol/sync.rs +++ b/substrate/client/network/src/protocol/sync.rs @@ -1202,6 +1202,27 @@ impl<B: BlockT> ChainSync<B> { fn is_already_downloading(&self, hash: &B::Hash) -> bool { self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) } + + /// Return some key metrics. + pub(crate) fn metrics(&self) -> Metrics { + use std::convert::TryInto; + Metrics { + queued_blocks: self.queue_blocks.len().try_into().unwrap_or(std::u32::MAX), + fork_targets: self.fork_targets.len().try_into().unwrap_or(std::u32::MAX), + finality_proofs: self.extra_finality_proofs.metrics(), + justifications: self.extra_justifications.metrics(), + _priv: () + } + } +} + +#[derive(Debug)] +pub(crate) struct Metrics { + pub(crate) queued_blocks: u32, + pub(crate) fork_targets: u32, + pub(crate) finality_proofs: extra_requests::Metrics, + pub(crate) justifications: extra_requests::Metrics, + _priv: () } /// Request the ancestry for a block. Sends a request for header and justification for the given diff --git a/substrate/client/network/src/protocol/sync/extra_requests.rs b/substrate/client/network/src/protocol/sync/extra_requests.rs index 38c250cddf26d920d9e2f2e2e08eed19ac53a3e4..81b12a1a704aab9b17737c4a03190bef519ff3c5 100644 --- a/substrate/client/network/src/protocol/sync/extra_requests.rs +++ b/substrate/client/network/src/protocol/sync/extra_requests.rs @@ -53,6 +53,15 @@ pub(crate) struct ExtraRequests<B: BlockT> { request_type_name: &'static str, } +#[derive(Debug)] +pub(crate) struct Metrics { + pub(crate) pending_requests: u32, + pub(crate) active_requests: u32, + pub(crate) importing_requests: u32, + pub(crate) failed_requests: u32, + _priv: () +} + impl<B: BlockT> ExtraRequests<B> { pub(crate) fn new(request_type_name: &'static str) -> Self { ExtraRequests { @@ -240,6 +249,18 @@ impl<B: BlockT> ExtraRequests<B> { pub(crate) fn pending_requests(&self) -> impl Iterator<Item = &ExtraRequest<B>> { self.pending_requests.iter() } + + /// Get some key metrics. + pub(crate) fn metrics(&self) -> Metrics { + use std::convert::TryInto; + Metrics { + pending_requests: self.pending_requests.len().try_into().unwrap_or(std::u32::MAX), + active_requests: self.active_requests.len().try_into().unwrap_or(std::u32::MAX), + failed_requests: self.failed_requests.len().try_into().unwrap_or(std::u32::MAX), + importing_requests: self.importing_requests.len().try_into().unwrap_or(std::u32::MAX), + _priv: () + } + } } /// Matches peers with pending extra requests. diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 821847add1df172ade73f47a85ca54109eab2519..5c618ac4ecab1794386afd622cfe85e64d87afa8 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -210,7 +210,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> { params.finality_proof_request_builder, params.protocol_id.clone(), peerset_config, - params.block_announce_validator + params.block_announce_validator, + params.metrics_registry.as_ref() )?; // Build the swarm. @@ -858,7 +859,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> { }; this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); - + if let Some(metrics) = this.metrics.as_ref() { metrics.is_major_syncing.set(is_major_syncing as u64); metrics.peers_count.set(num_connected_peers as u64);