From 77c78e1561bbe5ee0ecf414312bae82396ae6d11 Mon Sep 17 00:00:00 2001
From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Date: Wed, 15 Jan 2025 18:50:42 +0200
Subject: [PATCH] litep2p: Provide partial results to speedup GetRecord queries
 (#7099)

This PR provides the partial results of the `GetRecord` kademlia query.

This significantly improves the authority discovery records, from ~37
minutes to ~2/3 minutes.
In contrast, libp2p discovers authority records in around ~10 minutes.

The authority discovery was slow because litep2p provided the records
only after the Kademlia query was completed. A normal Kademlia query
completes in around 40 seconds to a few minutes.
In this PR, partial records are provided as soon as they are discovered
from the network.

### Testing Done

Started a node in Kusama with `--validator` and litep2p backend.
The node discovered 996/1000 authority records in ~ 1 minute 45 seconds.

![Screenshot 2025-01-09 at 12 26
08](https://github.com/user-attachments/assets/b618bf7c-2bba-43a0-a021-4047e854c075)


### Before & After

In this image, on the left side is libp2p, in the middle litep2p without
this PR, on the right litep2p with this PR

![Screenshot 2025-01-07 at 17 57
56](https://github.com/user-attachments/assets/a8d467f7-8dc7-461c-bcff-163b94d01ae8)



Closes: https://github.com/paritytech/polkadot-sdk/issues/7077

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
---
 Cargo.lock                                    |  4 +-
 Cargo.toml                                    |  2 +-
 prdoc/pr_7099.prdoc                           | 16 ++++
 .../client/network/src/litep2p/discovery.rs   | 33 +++++--
 substrate/client/network/src/litep2p/mod.rs   | 87 ++++++++-----------
 5 files changed, 79 insertions(+), 63 deletions(-)
 create mode 100644 prdoc/pr_7099.prdoc

diff --git a/Cargo.lock b/Cargo.lock
index 7725db743c4..0d71a770d38 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -10446,9 +10446,9 @@ dependencies = [
 
 [[package]]
 name = "litep2p"
-version = "0.8.4"
+version = "0.9.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b0fef34af8847e816003bf7fdeac5ea50b9a7a88441ac927a6166b5e812ab79"
+checksum = "6ca6ee50a125dc4fc4e9a3ae3640010796d1d07bc517a0ac715fdf0b24a0b6ac"
 dependencies = [
  "async-trait",
  "bs58",
diff --git a/Cargo.toml b/Cargo.toml
index c30a9949e85..eb99b80e16f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -850,7 +850,7 @@ linked-hash-map = { version = "0.5.4" }
 linked_hash_set = { version = "0.1.4" }
 linregress = { version = "0.5.1" }
 lite-json = { version = "0.2.0", default-features = false }
-litep2p = { version = "0.8.4", features = ["websocket"] }
+litep2p = { version = "0.9.0", features = ["websocket"] }
 log = { version = "0.4.22", default-features = false }
 macro_magic = { version = "0.5.1" }
 maplit = { version = "1.0.2" }
diff --git a/prdoc/pr_7099.prdoc b/prdoc/pr_7099.prdoc
new file mode 100644
index 00000000000..58d809f3c09
--- /dev/null
+++ b/prdoc/pr_7099.prdoc
@@ -0,0 +1,16 @@
+title: Provide partial results to speedup GetRecord queries
+
+doc:
+  - audience: Node Dev
+    description: |
+      This PR provides the partial results of the GetRecord kademlia query.
+      
+      This significantly improves the authority discovery records, from ~37 minutes to ~2/3 minutes.
+      In contrast, libp2p discovers authority records in around ~10 minutes.
+      
+      The authority discovery was slow because litep2p provided the records only after the Kademlia query was completed. A normal Kademlia query completes in around 40 seconds to a few minutes.
+      In this PR, partial records are provided as soon as they are discovered from the network.
+
+crates:
+  - name: sc-network
+    bump: patch
diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs
index b55df374f60..eb571804f30 100644
--- a/substrate/client/network/src/litep2p/discovery.rs
+++ b/substrate/client/network/src/litep2p/discovery.rs
@@ -33,8 +33,8 @@ use litep2p::{
 			identify::{Config as IdentifyConfig, IdentifyEvent},
 			kademlia::{
 				Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider,
-				IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
-				Record, RecordKey, RecordsType,
+				IncomingRecordValidationMode, KademliaEvent, KademliaHandle, PeerRecord, QueryId,
+				Quorum, Record, RecordKey,
 			},
 			ping::{Config as PingConfig, PingEvent},
 		},
@@ -129,13 +129,19 @@ pub enum DiscoveryEvent {
 		address: Multiaddr,
 	},
 
-	/// Record was found from the DHT.
+	/// `GetRecord` query succeeded.
 	GetRecordSuccess {
 		/// Query ID.
 		query_id: QueryId,
+	},
 
-		/// Records.
-		records: RecordsType,
+	/// Record was found from the DHT.
+	GetRecordPartialResult {
+		/// Query ID.
+		query_id: QueryId,
+
+		/// Record.
+		record: PeerRecord,
 	},
 
 	/// Record was successfully stored on the DHT.
@@ -573,13 +579,24 @@ impl Stream for Discovery {
 					peers: peers.into_iter().collect(),
 				}))
 			},
-			Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, records })) => {
+			Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id })) => {
 				log::trace!(
 					target: LOG_TARGET,
-					"`GET_RECORD` succeeded for {query_id:?}: {records:?}",
+					"`GET_RECORD` succeeded for {query_id:?}",
 				);
 
-				return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
+				return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id }));
+			},
+			Poll::Ready(Some(KademliaEvent::GetRecordPartialResult { query_id, record })) => {
+				log::trace!(
+					target: LOG_TARGET,
+					"`GET_RECORD` intermediary succeeded for {query_id:?}: {record:?}",
+				);
+
+				return Poll::Ready(Some(DiscoveryEvent::GetRecordPartialResult {
+					query_id,
+					record,
+				}));
 			},
 			Poll::Ready(Some(KademliaEvent::PutRecordSuccess { query_id, key: _ })) =>
 				return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs
index 52b2970525d..fc4cce47628 100644
--- a/substrate/client/network/src/litep2p/mod.rs
+++ b/substrate/client/network/src/litep2p/mod.rs
@@ -58,7 +58,7 @@ use litep2p::{
 	protocol::{
 		libp2p::{
 			bitswap::Config as BitswapConfig,
-			kademlia::{QueryId, Record, RecordsType},
+			kademlia::{QueryId, Record},
 		},
 		request_response::ConfigBuilder as RequestResponseConfigBuilder,
 	},
@@ -836,23 +836,45 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
 							self.peerstore_handle.add_known_peer(peer.into());
 						}
 					}
-					Some(DiscoveryEvent::GetRecordSuccess { query_id, records }) => {
+					Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
+						if !self.pending_queries.contains_key(&query_id) {
+							log::error!(
+								target: LOG_TARGET,
+								"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
+							);
+
+							continue
+						}
+
+						let peer_id: sc_network_types::PeerId = record.peer.into();
+						let record = PeerRecord {
+							record: P2PRecord {
+								key: record.record.key.to_vec().into(),
+								value: record.record.value,
+								publisher: record.record.publisher.map(|peer_id| {
+									let peer_id: sc_network_types::PeerId = peer_id.into();
+									peer_id.into()
+								}),
+								expires: record.record.expires,
+							},
+							peer: Some(peer_id.into()),
+						};
+
+						self.event_streams.send(
+							Event::Dht(
+								DhtEvent::ValueFound(
+									record.into()
+								)
+							)
+						);
+					}
+					Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
 						match self.pending_queries.remove(&query_id) {
 							Some(KadQuery::GetValue(key, started)) => {
 								log::trace!(
 									target: LOG_TARGET,
-									"`GET_VALUE` for {:?} ({query_id:?}) succeeded",
-									key,
+									"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
 								);
-								for record in litep2p_to_libp2p_peer_record(records) {
-									self.event_streams.send(
-										Event::Dht(
-											DhtEvent::ValueFound(
-												record.into()
-											)
-										)
-									);
-								}
 
 								if let Some(ref metrics) = self.metrics {
 									metrics
@@ -1165,42 +1187,3 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
 		}
 	}
 }
-
-// Glue code to convert from a litep2p records type to a libp2p2 PeerRecord.
-fn litep2p_to_libp2p_peer_record(records: RecordsType) -> Vec<PeerRecord> {
-	match records {
-		litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => {
-			vec![PeerRecord {
-				record: P2PRecord {
-					key: record.key.to_vec().into(),
-					value: record.value,
-					publisher: record.publisher.map(|peer_id| {
-						let peer_id: sc_network_types::PeerId = peer_id.into();
-						peer_id.into()
-					}),
-					expires: record.expires,
-				},
-				peer: None,
-			}]
-		},
-		litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records
-			.into_iter()
-			.map(|record| {
-				let peer_id: sc_network_types::PeerId = record.peer.into();
-
-				PeerRecord {
-					record: P2PRecord {
-						key: record.record.key.to_vec().into(),
-						value: record.record.value,
-						publisher: record.record.publisher.map(|peer_id| {
-							let peer_id: sc_network_types::PeerId = peer_id.into();
-							peer_id.into()
-						}),
-						expires: record.record.expires,
-					},
-					peer: Some(peer_id.into()),
-				}
-			})
-			.collect::<Vec<_>>(),
-	}
-}
-- 
GitLab