Skip to content
Snippets Groups Projects
Unverified Commit 8795ae66 authored by Andrei Eres's avatar Andrei Eres Committed by GitHub
Browse files

Add networking benchmarks for libp2p (#6077)


# Description

Implemented benchmarks for Notifications and RequestResponse protocols
with libp2p implementation. These benchmarks allow us to monitor
regressions and implement fixes before they are observed in real chain.
In the future, they can be used for targeted optimizations of litep2p
compared to libp2p.

Part of https://github.com/paritytech/polkadot-sdk/issues/5220

Next steps:
- Add benchmarks for litep2p implementation
- Optimize load to get better results
- Add benchmarks to CI to catch regressions



## Integration

Benchmarks don't affect downstream projects.

---------

Co-authored-by: default avataralvicsam <alvicsam@gmail.com>
Co-authored-by: default avatarGitHub Action <action@github.com>
parent 65e79720
No related merge requests found
Pipeline #504759 waiting for manual action with stages
in 15 minutes and 39 seconds
......@@ -19393,6 +19393,7 @@ dependencies = [
"asynchronous-codec",
"bytes",
"cid 0.9.0",
"criterion",
"either",
"fnv",
"futures",
......@@ -19414,6 +19415,7 @@ dependencies = [
"rand",
"sc-block-builder",
"sc-client-api",
"sc-consensus",
"sc-network-common",
"sc-network-light",
"sc-network-sync",
......
title: Add networking benchmarks for libp2p
doc:
- audience: node_dev
description: |-
Adds benchmarks for Notifications and RequestResponse protocols with libp2p implementation
crates:
- name: sc-network
validate: false
......@@ -70,7 +70,7 @@ mockall = { workspace = true }
multistream-select = { workspace = true }
rand = { workspace = true, default-features = true }
tempfile = { workspace = true }
tokio = { features = ["macros"], workspace = true, default-features = true }
tokio = { features = ["macros", "rt-multi-thread"], workspace = true, default-features = true }
tokio-util = { features = ["compat"], workspace = true }
tokio-test = { workspace = true }
sc-block-builder = { workspace = true, default-features = true }
......@@ -83,5 +83,17 @@ sp-tracing = { workspace = true, default-features = true }
substrate-test-runtime = { workspace = true }
substrate-test-runtime-client = { workspace = true }
criterion = { workspace = true, default-features = true, features = ["async_tokio"] }
sc-consensus = { workspace = true, default-features = true }
[features]
default = []
[[bench]]
name = "notifications_protocol"
harness = false
[[bench]]
name = "request_response_protocol"
harness = false
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use criterion::{
criterion_group, criterion_main, AxisScale, BenchmarkId, Criterion, PlotConfiguration,
Throughput,
};
use sc_network::{
config::{
FullNetworkConfiguration, MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig,
NonReservedPeerMode, NotificationHandshake, Params, ProtocolId, Role, SetConfig,
},
service::traits::NotificationEvent,
NetworkWorker, NotificationMetrics, NotificationService, Roles,
};
use sc_network_common::sync::message::BlockAnnouncesHandshake;
use sc_network_types::build_multiaddr;
use sp_runtime::traits::Zero;
use std::{
net::{IpAddr, Ipv4Addr, TcpListener},
str::FromStr,
};
use substrate_test_runtime_client::runtime;
const MAX_SIZE: u64 = 2u64.pow(30);
const SAMPLE_SIZE: usize = 50;
const NOTIFICATIONS: usize = 50;
const EXPONENTS: &[(u32, &'static str)] = &[
(6, "64B"),
(9, "512B"),
(12, "4KB"),
(15, "64KB"),
(18, "256KB"),
(21, "2MB"),
(24, "16MB"),
(27, "128MB"),
];
// TODO: It's be better to bind system-provided port when initializing the worker
fn get_listen_address() -> sc_network::Multiaddr {
let ip = Ipv4Addr::from_str("127.0.0.1").unwrap();
let listener = TcpListener::bind((IpAddr::V4(ip), 0)).unwrap(); // Bind to a random port
let local_addr = listener.local_addr().unwrap();
let port = local_addr.port();
build_multiaddr!(Ip4(ip), Tcp(port))
}
pub fn create_network_worker(
listen_addr: sc_network::Multiaddr,
) -> (NetworkWorker<runtime::Block, runtime::Hash>, Box<dyn NotificationService>) {
let role = Role::Full;
let genesis_hash = runtime::Hash::zero();
let (block_announce_config, notification_service) = NonDefaultSetConfig::new(
"/block-announces/1".into(),
vec!["/bench-notifications-protocol/block-announces/1".into()],
MAX_SIZE,
Some(NotificationHandshake::new(BlockAnnouncesHandshake::<runtime::Block>::build(
Roles::from(&role),
Zero::zero(),
genesis_hash,
genesis_hash,
))),
SetConfig {
in_peers: 1,
out_peers: 1,
reserved_nodes: vec![],
non_reserved_mode: NonReservedPeerMode::Accept,
},
);
let mut net_conf = NetworkConfiguration::new_local();
net_conf.listen_addresses = vec![listen_addr];
let worker = NetworkWorker::<runtime::Block, runtime::Hash>::new(Params::<
runtime::Block,
runtime::Hash,
NetworkWorker<_, _>,
> {
block_announce_config,
role,
executor: Box::new(|f| {
tokio::spawn(f);
}),
genesis_hash,
network_config: FullNetworkConfiguration::new(&net_conf, None),
protocol_id: ProtocolId::from("bench-protocol-name"),
fork_id: None,
metrics_registry: None,
bitswap_config: None,
notification_metrics: NotificationMetrics::new(None),
})
.unwrap();
(worker, notification_service)
}
async fn run_serially(size: usize, limit: usize) {
let listen_address1 = get_listen_address();
let listen_address2 = get_listen_address();
let (worker1, mut notification_service1) = create_network_worker(listen_address1);
let (worker2, mut notification_service2) = create_network_worker(listen_address2.clone());
let peer_id2: sc_network::PeerId = (*worker2.local_peer_id()).into();
worker1
.add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 })
.unwrap();
let network1_run = worker1.run();
let network2_run = worker2.run();
let (tx, rx) = async_channel::bounded(10);
let network1 = tokio::spawn(async move {
tokio::pin!(network1_run);
loop {
tokio::select! {
_ = &mut network1_run => {},
event = notification_service1.next_event() => {
match event {
Some(NotificationEvent::NotificationStreamOpened { .. }) => {
notification_service1
.send_async_notification(&peer_id2, vec![0; size])
.await
.unwrap();
},
event => panic!("Unexpected event {:?}", event),
};
},
message = rx.recv() => {
match message {
Ok(Some(_)) => {
notification_service1
.send_async_notification(&peer_id2, vec![0; size])
.await
.unwrap();
},
Ok(None) => break,
Err(err) => panic!("Unexpected error {:?}", err),
}
}
}
}
});
let network2 = tokio::spawn(async move {
let mut received_counter = 0;
tokio::pin!(network2_run);
loop {
tokio::select! {
_ = &mut network2_run => {},
event = notification_service2.next_event() => {
match event {
Some(NotificationEvent::ValidateInboundSubstream { result_tx, .. }) => {
result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap();
},
Some(NotificationEvent::NotificationStreamOpened { .. }) => {},
Some(NotificationEvent::NotificationReceived { .. }) => {
received_counter += 1;
if received_counter >= limit {
let _ = tx.send(None).await;
break
}
let _ = tx.send(Some(())).await;
},
event => panic!("Unexpected event {:?}", event),
};
},
}
}
});
let _ = tokio::join!(network1, network2);
}
async fn run_with_backpressure(size: usize, limit: usize) {
let listen_address1 = get_listen_address();
let listen_address2 = get_listen_address();
let (worker1, mut notification_service1) = create_network_worker(listen_address1);
let (worker2, mut notification_service2) = create_network_worker(listen_address2.clone());
let peer_id2: sc_network::PeerId = (*worker2.local_peer_id()).into();
worker1
.add_reserved_peer(MultiaddrWithPeerId { multiaddr: listen_address2, peer_id: peer_id2 })
.unwrap();
let network1_run = worker1.run();
let network2_run = worker2.run();
let network1 = tokio::spawn(async move {
let mut sent_counter = 0;
tokio::pin!(network1_run);
loop {
tokio::select! {
_ = &mut network1_run => {},
event = notification_service1.next_event() => {
match event {
Some(NotificationEvent::NotificationStreamOpened { .. }) => {
while sent_counter < limit {
sent_counter += 1;
notification_service1
.send_async_notification(&peer_id2, vec![0; size])
.await
.unwrap();
}
},
Some(NotificationEvent::NotificationStreamClosed { .. }) => {
if sent_counter != limit { panic!("Stream closed unexpectedly") }
break
},
event => panic!("Unexpected event {:?}", event),
};
},
}
}
});
let network2 = tokio::spawn(async move {
let mut received_counter = 0;
tokio::pin!(network2_run);
loop {
tokio::select! {
_ = &mut network2_run => {},
event = notification_service2.next_event() => {
match event {
Some(NotificationEvent::ValidateInboundSubstream { result_tx, .. }) => {
result_tx.send(sc_network::service::traits::ValidationResult::Accept).unwrap();
},
Some(NotificationEvent::NotificationStreamOpened { .. }) => {},
Some(NotificationEvent::NotificationStreamClosed { .. }) => {
if received_counter != limit { panic!("Stream closed unexpectedly") }
break
},
Some(NotificationEvent::NotificationReceived { .. }) => {
received_counter += 1;
if received_counter >= limit { break }
},
event => panic!("Unexpected event {:?}", event),
};
},
}
}
});
let _ = tokio::join!(network1, network2);
}
fn run_benchmark(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic);
let mut group = c.benchmark_group("notifications_benchmark");
group.plot_config(plot_config);
for &(exponent, label) in EXPONENTS.iter() {
let size = 2usize.pow(exponent);
group.throughput(Throughput::Bytes(NOTIFICATIONS as u64 * size as u64));
group.bench_with_input(
BenchmarkId::new("consistently", label),
&(size, NOTIFICATIONS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run_serially(size, limit));
},
);
group.bench_with_input(
BenchmarkId::new("with_backpressure", label),
&(size, NOTIFICATIONS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run_with_backpressure(size, limit));
},
);
}
}
criterion_group! {
name = benches;
config = Criterion::default().sample_size(SAMPLE_SIZE);
targets = run_benchmark
}
criterion_main!(benches);
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use criterion::{
criterion_group, criterion_main, AxisScale, BenchmarkId, Criterion, PlotConfiguration,
Throughput,
};
use sc_network::{
config::{
FullNetworkConfiguration, IncomingRequest, NetworkConfiguration, NonDefaultSetConfig,
NonReservedPeerMode, NotificationHandshake, OutgoingResponse, Params, ProtocolId, Role,
SetConfig,
},
IfDisconnected, NetworkBackend, NetworkRequest, NetworkWorker, NotificationMetrics,
NotificationService, Roles,
};
use sc_network_common::sync::message::BlockAnnouncesHandshake;
use sc_network_types::build_multiaddr;
use sp_runtime::traits::Zero;
use std::{
net::{IpAddr, Ipv4Addr, TcpListener},
str::FromStr,
time::Duration,
};
use substrate_test_runtime_client::runtime;
const MAX_SIZE: u64 = 2u64.pow(30);
const SAMPLE_SIZE: usize = 50;
const REQUESTS: usize = 50;
const EXPONENTS: &[(u32, &'static str)] = &[
(6, "64B"),
(9, "512B"),
(12, "4KB"),
(15, "64KB"),
(18, "256KB"),
(21, "2MB"),
(24, "16MB"),
(27, "128MB"),
];
fn get_listen_address() -> sc_network::Multiaddr {
let ip = Ipv4Addr::from_str("127.0.0.1").unwrap();
let listener = TcpListener::bind((IpAddr::V4(ip), 0)).unwrap(); // Bind to a random port
let local_addr = listener.local_addr().unwrap();
let port = local_addr.port();
build_multiaddr!(Ip4(ip), Tcp(port))
}
pub fn create_network_worker(
listen_addr: sc_network::Multiaddr,
) -> (
NetworkWorker<runtime::Block, runtime::Hash>,
async_channel::Receiver<IncomingRequest>,
Box<dyn NotificationService>,
) {
let (tx, rx) = async_channel::bounded(10);
let request_response_config =
NetworkWorker::<runtime::Block, runtime::Hash>::request_response_config(
"/request-response/1".into(),
vec![],
MAX_SIZE,
MAX_SIZE,
Duration::from_secs(2),
Some(tx),
);
let mut net_conf = NetworkConfiguration::new_local();
net_conf.listen_addresses = vec![listen_addr];
let mut network_config = FullNetworkConfiguration::new(&net_conf, None);
network_config.add_request_response_protocol(request_response_config);
let (block_announce_config, notification_service) = NonDefaultSetConfig::new(
"/block-announces/1".into(),
vec![],
1024,
Some(NotificationHandshake::new(BlockAnnouncesHandshake::<runtime::Block>::build(
Roles::from(&Role::Full),
Zero::zero(),
runtime::Hash::zero(),
runtime::Hash::zero(),
))),
SetConfig {
in_peers: 1,
out_peers: 1,
reserved_nodes: vec![],
non_reserved_mode: NonReservedPeerMode::Accept,
},
);
let worker = NetworkWorker::<runtime::Block, runtime::Hash>::new(Params::<
runtime::Block,
runtime::Hash,
NetworkWorker<_, _>,
> {
block_announce_config,
role: Role::Full,
executor: Box::new(|f| {
tokio::spawn(f);
}),
genesis_hash: runtime::Hash::zero(),
network_config,
protocol_id: ProtocolId::from("bench-request-response-protocol"),
fork_id: None,
metrics_registry: None,
bitswap_config: None,
notification_metrics: NotificationMetrics::new(None),
})
.unwrap();
(worker, rx, notification_service)
}
async fn run_serially(size: usize, limit: usize) {
let listen_address1 = get_listen_address();
let listen_address2 = get_listen_address();
let (mut worker1, _rx1, _notification_service1) = create_network_worker(listen_address1);
let service1 = worker1.service().clone();
let (worker2, rx2, _notification_service2) = create_network_worker(listen_address2.clone());
let peer_id2 = *worker2.local_peer_id();
worker1.add_known_address(peer_id2, listen_address2.into());
let network1_run = worker1.run();
let network2_run = worker2.run();
let (break_tx, break_rx) = async_channel::bounded(10);
let requests = async move {
let mut sent_counter = 0;
while sent_counter < limit {
let _ = service1
.request(
peer_id2.into(),
"/request-response/1".into(),
vec![0; 2],
None,
IfDisconnected::TryConnect,
)
.await
.unwrap();
sent_counter += 1;
}
let _ = break_tx.send(()).await;
};
let network1 = tokio::spawn(async move {
tokio::pin!(requests);
tokio::pin!(network1_run);
loop {
tokio::select! {
_ = &mut network1_run => {},
_ = &mut requests => break,
}
}
});
let network2 = tokio::spawn(async move {
tokio::pin!(network2_run);
loop {
tokio::select! {
_ = &mut network2_run => {},
res = rx2.recv() => {
let IncomingRequest { pending_response, .. } = res.unwrap();
pending_response.send(OutgoingResponse {
result: Ok(vec![0; size]),
reputation_changes: vec![],
sent_feedback: None,
}).unwrap();
},
_ = break_rx.recv() => break,
}
}
});
let _ = tokio::join!(network1, network2);
}
// The libp2p request-response implementation does not provide any backpressure feedback.
// So this benchmark is useless until we implement it for litep2p.
#[allow(dead_code)]
async fn run_with_backpressure(size: usize, limit: usize) {
let listen_address1 = get_listen_address();
let listen_address2 = get_listen_address();
let (mut worker1, _rx1, _notification_service1) = create_network_worker(listen_address1);
let service1 = worker1.service().clone();
let (worker2, rx2, _notification_service2) = create_network_worker(listen_address2.clone());
let peer_id2 = *worker2.local_peer_id();
worker1.add_known_address(peer_id2, listen_address2.into());
let network1_run = worker1.run();
let network2_run = worker2.run();
let (break_tx, break_rx) = async_channel::bounded(10);
let requests = futures::future::join_all((0..limit).into_iter().map(|_| {
let (tx, rx) = futures::channel::oneshot::channel();
service1.start_request(
peer_id2.into(),
"/request-response/1".into(),
vec![0; 8],
None,
tx,
IfDisconnected::TryConnect,
);
rx
}));
let network1 = tokio::spawn(async move {
tokio::pin!(requests);
tokio::pin!(network1_run);
loop {
tokio::select! {
_ = &mut network1_run => {},
responses = &mut requests => {
for res in responses {
res.unwrap().unwrap();
}
let _ = break_tx.send(()).await;
break;
},
}
}
});
let network2 = tokio::spawn(async move {
tokio::pin!(network2_run);
loop {
tokio::select! {
_ = &mut network2_run => {},
res = rx2.recv() => {
let IncomingRequest { pending_response, .. } = res.unwrap();
pending_response.send(OutgoingResponse {
result: Ok(vec![0; size]),
reputation_changes: vec![],
sent_feedback: None,
}).unwrap();
},
_ = break_rx.recv() => break,
}
}
});
let _ = tokio::join!(network1, network2);
}
fn run_benchmark(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic);
let mut group = c.benchmark_group("request_response_benchmark");
group.plot_config(plot_config);
for &(exponent, label) in EXPONENTS.iter() {
let size = 2usize.pow(exponent);
group.throughput(Throughput::Bytes(REQUESTS as u64 * size as u64));
group.bench_with_input(
BenchmarkId::new("consistently", label),
&(size, REQUESTS),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run_serially(size, limit));
},
);
}
}
criterion_group! {
name = benches;
config = Criterion::default().sample_size(SAMPLE_SIZE);
targets = run_benchmark
}
criterion_main!(benches);
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment