Unverified Commit 5754204d authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Network bridge metrics (#2818)



* add metrics (unused) to network bridge

* fix test compilation

* trigger metrics messages

* add some more metrics

* track sent and received notifications

* restore metrics import

* integrate into service

* Update node/network/bridge/src/lib.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Update node/network/bridge/src/lib.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 72318d75
Pipeline #132701 failed with stages
in 31 minutes and 46 seconds
......@@ -15,13 +15,13 @@ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-subsystem-util = { path = "../../subsystem-util"}
strum = "0.20.0"
parking_lot = "0.11.1"
[dev-dependencies]
assert_matches = "1.4.0"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
polkadot-node-subsystem-util = { path = "../../subsystem-util"}
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures-timer = "3"
......@@ -40,6 +40,7 @@ use polkadot_node_network_protocol::{
PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep,
ObservedRole,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
/// Peer set infos for network initialization.
///
......@@ -77,6 +78,154 @@ const EMPTY_VIEW_COST: Rep = Rep::CostMajor("Peer sent us an empty view");
// network bridge log target
const LOG_TARGET: &'static str = "parachain::network-bridge";
/// Metrics for the network bridge.
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_peer_connected(&self, peer_set: PeerSet) {
self.0.as_ref().map(|metrics| metrics
.connected_events
.with_label_values(&[peer_set.get_protocol_name_static()])
.inc()
);
}
fn on_peer_disconnected(&self, peer_set: PeerSet) {
self.0.as_ref().map(|metrics| metrics
.disconnected_events
.with_label_values(&[peer_set.get_protocol_name_static()])
.inc()
);
}
fn note_peer_count(&self, peer_set: PeerSet, count: usize) {
self.0.as_ref().map(|metrics| metrics
.peer_count
.with_label_values(&[peer_set.get_protocol_name_static()])
.set(count as u64)
);
}
fn on_notification_received(&self, peer_set: PeerSet, size: usize) {
if let Some(metrics) = self.0.as_ref() {
metrics.notifications_received
.with_label_values(&[peer_set.get_protocol_name_static()])
.inc();
metrics.bytes_received
.with_label_values(&[peer_set.get_protocol_name_static()])
.inc_by(size as u64);
}
}
fn on_notification_sent(&self, peer_set: PeerSet, size: usize, to_peers: usize) {
if let Some(metrics) = self.0.as_ref() {
metrics.notifications_sent
.with_label_values(&[peer_set.get_protocol_name_static()])
.inc_by(to_peers as u64);
metrics.bytes_sent
.with_label_values(&[peer_set.get_protocol_name_static()])
.inc_by((size * to_peers) as u64);
}
}
}
#[derive(Clone)]
struct MetricsInner {
peer_count: prometheus::GaugeVec<prometheus::U64>,
connected_events: prometheus::CounterVec<prometheus::U64>,
disconnected_events: prometheus::CounterVec<prometheus::U64>,
notifications_received: prometheus::CounterVec<prometheus::U64>,
notifications_sent: prometheus::CounterVec<prometheus::U64>,
bytes_received: prometheus::CounterVec<prometheus::U64>,
bytes_sent: prometheus::CounterVec<prometheus::U64>,
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError>
{
let metrics = MetricsInner {
peer_count: prometheus::register(
prometheus::GaugeVec::new(
prometheus::Opts::new(
"parachain_peer_count",
"The number of peers on a parachain-related peer-set",
),
&["protocol"]
)?,
registry,
)?,
connected_events: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_peer_connect_events_total",
"The number of peer connect events on a parachain notifications protocol",
),
&["protocol"]
)?,
registry,
)?,
disconnected_events: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_peer_disconnect_events_total",
"The number of peer disconnect events on a parachain notifications protocol",
),
&["protocol"]
)?,
registry,
)?,
notifications_received: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_notifications_received_total",
"The number of notifications received on a parachain protocol",
),
&["protocol"]
)?,
registry,
)?,
notifications_sent: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_notifications_sent_total",
"The number of notifications sent on a parachain protocol",
),
&["protocol"]
)?,
registry,
)?,
bytes_received: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_notification_bytes_received_total",
"The number of bytes received on a parachain notification protocol",
),
&["protocol"]
)?,
registry,
)?,
bytes_sent: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_notification_bytes_sent_total",
"The number of bytes sent on a parachain notification protocol",
),
&["protocol"]
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
/// Messages from and to the network.
///
/// As transmitted to and received from subsystems.
......@@ -90,7 +239,6 @@ pub enum WireMessage<M> {
ViewUpdate(View),
}
/// The network bridge subsystem.
pub struct NetworkBridge<N, AD> {
/// `Network` trait implementing type.
......@@ -98,6 +246,7 @@ pub struct NetworkBridge<N, AD> {
authority_discovery_service: AD,
request_multiplexer: RequestMultiplexer,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
}
impl<N, AD> NetworkBridge<N, AD> {
......@@ -110,12 +259,14 @@ impl<N, AD> NetworkBridge<N, AD> {
authority_discovery_service: AD,
request_multiplexer: RequestMultiplexer,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
) -> Self {
NetworkBridge {
network_service,
authority_discovery_service,
request_multiplexer,
sync_oracle,
metrics,
}
}
}
......@@ -190,6 +341,7 @@ async fn handle_subsystem_messages<Context, N, AD>(
validator_discovery_notifications: mpsc::Receiver<ValidatorDiscoveryNotification>,
shared: Shared,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
) -> Result<(), UnexpectedAbort>
where
Context: SubsystemContext<Message = NetworkBridgeMessage>,
......@@ -243,6 +395,7 @@ where
&live_heads,
&shared,
finalized_number,
&metrics,
).await?;
}
}
......@@ -292,6 +445,7 @@ where
peers,
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
&metrics,
).await?
}
NetworkBridgeMessage::SendValidationMessages(msgs) => {
......@@ -307,6 +461,7 @@ where
peers,
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
&metrics,
).await?
}
}
......@@ -322,6 +477,7 @@ where
peers,
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
&metrics,
).await?
}
NetworkBridgeMessage::SendCollationMessages(msgs) => {
......@@ -337,6 +493,7 @@ where
peers,
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
&metrics,
).await?
}
}
......@@ -402,6 +559,7 @@ async fn handle_network_messages(
mut network_service: impl Network,
mut request_multiplexer: RequestMultiplexer,
mut validator_discovery_notifications: mpsc::Sender<ValidatorDiscoveryNotification>,
metrics: Metrics,
shared: Shared,
) -> Result<(), UnexpectedAbort> {
let mut network_stream = network_service.event_stream();
......@@ -442,6 +600,9 @@ async fn handle_network_messages(
}
}
metrics.on_peer_connected(peer_set);
metrics.note_peer_count(peer_set, peer_map.len());
shared.local_view.clone().unwrap_or(View::default())
};
......@@ -472,6 +633,7 @@ async fn handle_network_messages(
WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
local_view,
),
&metrics,
).await?;
}
PeerSet::Collation => {
......@@ -493,6 +655,7 @@ async fn handle_network_messages(
WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(
local_view,
),
&metrics,
).await?;
}
}
......@@ -517,7 +680,12 @@ async fn handle_network_messages(
PeerSet::Collation => &mut shared.collation_peers,
};
peer_map.remove(&peer).is_some()
let w = peer_map.remove(&peer).is_some();
metrics.on_peer_disconnected(peer_set);
metrics.note_peer_count(peer_set, peer_map.len());
w
};
// Failure here means that the other side of the network bridge
......@@ -545,7 +713,10 @@ async fn handle_network_messages(
.filter(|(protocol, _)| {
protocol == &PeerSet::Validation.into_protocol_name()
})
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.map(|(_, msg_bytes)| {
WireMessage::decode(&mut msg_bytes.as_ref())
.map(|m| (m, msg_bytes.len()))
})
.collect();
let v_messages = match v_messages {
......@@ -566,7 +737,10 @@ async fn handle_network_messages(
.filter(|(protocol, _)| {
protocol == &PeerSet::Collation.into_protocol_name()
})
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.map(|(_, msg_bytes)| {
WireMessage::decode(&mut msg_bytes.as_ref())
.map(|m| (m, msg_bytes.len()))
})
.collect();
match c_messages {
......@@ -594,8 +768,10 @@ async fn handle_network_messages(
if !v_messages.is_empty() {
let (events, reports) = handle_peer_messages(
remote.clone(),
PeerSet::Validation,
&mut shared.0.lock().validation_peers,
v_messages,
&metrics,
);
for report in reports {
......@@ -608,8 +784,10 @@ async fn handle_network_messages(
if !c_messages.is_empty() {
let (events, reports) = handle_peer_messages(
remote.clone(),
PeerSet::Collation,
&mut shared.0.lock().collation_peers,
c_messages,
&metrics,
);
for report in reports {
......@@ -666,6 +844,7 @@ where
network_service,
request_multiplexer,
authority_discovery_service,
metrics,
sync_oracle,
} = bridge;
......@@ -676,6 +855,7 @@ where
network_service.clone(),
request_multiplexer,
validation_worker_tx,
metrics.clone(),
shared.clone(),
).remote_handle();
......@@ -688,6 +868,7 @@ where
validation_worker_rx,
shared,
sync_oracle,
metrics,
);
futures::pin_mut!(subsystem_event_handler);
......@@ -738,13 +919,14 @@ fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_n
)
}
#[tracing::instrument(level = "trace", skip(net, ctx, shared), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(net, ctx, shared, metrics), fields(subsystem = LOG_TARGET))]
async fn update_our_view(
net: &mut impl Network,
ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
live_heads: &[ActivatedLeaf],
shared: &Shared,
finalized_number: BlockNumber,
metrics: &Metrics,
) -> SubsystemResult<()> {
let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number);
......@@ -780,12 +962,14 @@ async fn update_our_view(
net,
validation_peers,
WireMessage::ViewUpdate(new_view.clone()),
metrics,
).await?;
send_collation_message(
net,
collation_peers,
WireMessage::ViewUpdate(new_view),
metrics,
).await?;
let our_view = OurView::new(
......@@ -808,11 +992,13 @@ async fn update_our_view(
// Handle messages on a specific peer-set. The peer is expected to be connected on that
// peer-set.
#[tracing::instrument(level = "trace", skip(peers, messages), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(peers, messages, metrics), fields(subsystem = LOG_TARGET))]
fn handle_peer_messages<M>(
peer: PeerId,
peer_set: PeerSet,
peers: &mut HashMap<PeerId, PeerData>,
messages: Vec<WireMessage<M>>,
messages: Vec<(WireMessage<M>, usize)>,
metrics: &Metrics,
) -> (Vec<NetworkBridgeEvent<M>>, Vec<Rep>) {
let peer_data = match peers.get_mut(&peer) {
None => {
......@@ -824,7 +1010,9 @@ fn handle_peer_messages<M>(
let mut outgoing_messages = Vec::with_capacity(messages.len());
let mut reports = Vec::new();
for message in messages {
for (message, size_bytes) in messages {
metrics.on_notification_received(peer_set, size_bytes);
outgoing_messages.push(match message {
WireMessage::ViewUpdate(new_view) => {
if new_view.len() > MAX_VIEW_HEADS ||
......@@ -855,30 +1043,32 @@ fn handle_peer_messages<M>(
(outgoing_messages, reports)
}
#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(net, peers, metrics), fields(subsystem = LOG_TARGET))]
async fn send_validation_message<I>(
net: &mut impl Network,
peers: I,
message: WireMessage<protocol_v1::ValidationProtocol>,
metrics: &Metrics,
) -> SubsystemResult<()>
where
I: IntoIterator<Item=PeerId>,
I::IntoIter: ExactSizeIterator,
{
send_message(net, peers, PeerSet::Validation, message).await
send_message(net, peers, PeerSet::Validation, message, metrics).await
}
#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(net, peers, metrics), fields(subsystem = LOG_TARGET))]
async fn send_collation_message<I>(
net: &mut impl Network,
peers: I,
message: WireMessage<protocol_v1::CollationProtocol>,
metrics: &Metrics,
) -> SubsystemResult<()>
where
I: IntoIterator<Item=PeerId>,
I::IntoIter: ExactSizeIterator,
{
send_message(net, peers, PeerSet::Collation, message).await
send_message(net, peers, PeerSet::Collation, message, metrics).await
}
......@@ -1193,6 +1383,7 @@ mod tests {
network_service: network,
authority_discovery_service: discovery,
request_multiplexer,
metrics: Metrics(None),
sync_oracle,
};
......
......@@ -50,6 +50,7 @@ pub(crate) async fn send_message<M, I>(
peers: I,
peer_set: PeerSet,
message: M,
metrics: &super::Metrics,
) -> SubsystemResult<()>
where
M: Encode + Clone,
......@@ -59,7 +60,12 @@ where
let mut message_producer = stream::iter({
let peers = peers.into_iter();
let n_peers = peers.len();
let mut message = Some(message.encode());
let mut message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, encoded.len(), n_peers);
Some(encoded)
};
peers.enumerate().map(move |(i, peer)| {
// optimization: avoid cloning the message for the last peer in the
......
......@@ -63,12 +63,12 @@ impl Metrics {
}
/// Provide a timer for handling `ConnectionRequest` which observes on drop.
fn time_handle_connection_request(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
fn time_handle_connection_request(&self) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.handle_connection_request.start_timer())
}
/// Provide a timer for `process_msg` which observes on drop.
fn time_process_msg(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
fn time_process_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
}
}
......
......@@ -548,6 +548,7 @@ where
authority_discovery,
request_multiplexer,
Box::new(network_service.clone()),
Metrics::register(registry)?,
),
provisioner: ProvisionerSubsystem::new(
spawner.clone(),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment