From fa22eb73610df0af1f6c46cf16d60273ce7cfd37 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bastian=20K=C3=B6cher?= <bkchr@users.noreply.github.com>
Date: Mon, 31 May 2021 20:17:15 +0200
Subject: [PATCH] Optimize `next_storage_key` (#8956)

* Optimize `next_storage_key`

- Do not rely on recursion
- Use an iterator over the overlay to not always call the same method

* Fix bug
---
 substrate/primitives/state-machine/src/ext.rs | 116 ++++++++++++++----
 .../src/overlayed_changes/changeset.rs        |  46 +++----
 .../src/overlayed_changes/mod.rs              |  42 +++----
 3 files changed, 136 insertions(+), 68 deletions(-)

diff --git a/substrate/primitives/state-machine/src/ext.rs b/substrate/primitives/state-machine/src/ext.rs
index 471674580d2..2649c320e14 100644
--- a/substrate/primitives/state-machine/src/ext.rs
+++ b/substrate/primitives/state-machine/src/ext.rs
@@ -32,7 +32,7 @@ use sp_externalities::{
 };
 use codec::{Decode, Encode, EncodeAppend};
 
-use sp_std::{fmt, any::{Any, TypeId}, vec::Vec, vec, boxed::Box};
+use sp_std::{fmt, any::{Any, TypeId}, vec::Vec, vec, boxed::Box, cmp::Ordering};
 use crate::{warn, trace, log_error};
 #[cfg(feature = "std")]
 use crate::changes_trie::State as ChangesTrieState;
@@ -323,16 +323,37 @@ where
 	}
 
 	fn next_storage_key(&self, key: &[u8]) -> Option<StorageKey> {
-		let next_backend_key = self.backend.next_storage_key(key).expect(EXT_NOT_ALLOWED_TO_FAIL);
-		let next_overlay_key_change = self.overlay.next_storage_key_change(key);
-
-		match (next_backend_key, next_overlay_key_change) {
-			(Some(backend_key), Some(overlay_key)) if &backend_key[..] < overlay_key.0 => Some(backend_key),
-			(backend_key, None) => backend_key,
-			(_, Some(overlay_key)) => if overlay_key.1.value().is_some() {
-				Some(overlay_key.0.to_vec())
-			} else {
-				self.next_storage_key(&overlay_key.0[..])
+		let mut next_backend_key = self.backend.next_storage_key(key).expect(EXT_NOT_ALLOWED_TO_FAIL);
+		let mut overlay_changes = self.overlay.iter_after(key).peekable();
+
+		match (&next_backend_key, overlay_changes.peek()) {
+			(_, None) => next_backend_key,
+			(Some(_), Some(_)) => {
+				while let Some(overlay_key) = overlay_changes.next() {
+					let cmp = next_backend_key.as_deref().map(|v| v.cmp(&overlay_key.0));
+
+					// If `backend_key` is less than the `overlay_key`, we found out next key.
+					if cmp == Some(Ordering::Less) {
+						return next_backend_key
+					} else if overlay_key.1.value().is_some() {
+						// If there exists a value for the `overlay_key` in the overlay
+						// (aka the key is still valid), it means we have found our next key.
+						return Some(overlay_key.0.to_vec())
+					} else if cmp == Some(Ordering::Equal) {
+						// If the `backend_key` and `overlay_key` are equal, it means that we need
+						// to search for the next backend key, because the overlay has overwritten
+						// this key.
+						next_backend_key = self.backend.next_storage_key(
+							&overlay_key.0,
+						).expect(EXT_NOT_ALLOWED_TO_FAIL);
+					}
+				}
+
+				next_backend_key
+			},
+			(None, Some(_)) => {
+				// Find the next overlay key that has a value attached.
+				overlay_changes.find_map(|k| k.1.value().as_ref().map(|_| k.0.to_vec()))
 			},
 		}
 	}
@@ -342,24 +363,43 @@ where
 		child_info: &ChildInfo,
 		key: &[u8],
 	) -> Option<StorageKey> {
-		let next_backend_key = self.backend
+		let mut next_backend_key = self.backend
 			.next_child_storage_key(child_info, key)
 			.expect(EXT_NOT_ALLOWED_TO_FAIL);
-		let next_overlay_key_change = self.overlay.next_child_storage_key_change(
+		let mut overlay_changes = self.overlay.child_iter_after(
 			child_info.storage_key(),
 			key
-		);
+		).peekable();
+
+		match (&next_backend_key, overlay_changes.peek()) {
+			(_, None) => next_backend_key,
+			(Some(_), Some(_)) => {
+				while let Some(overlay_key) = overlay_changes.next() {
+					let cmp = next_backend_key.as_deref().map(|v| v.cmp(&overlay_key.0));
+
+					// If `backend_key` is less than the `overlay_key`, we found out next key.
+					if cmp == Some(Ordering::Less) {
+						return next_backend_key
+					} else if overlay_key.1.value().is_some() {
+						// If there exists a value for the `overlay_key` in the overlay
+						// (aka the key is still valid), it means we have found our next key.
+						return Some(overlay_key.0.to_vec())
+					} else if cmp == Some(Ordering::Equal) {
+						// If the `backend_key` and `overlay_key` are equal, it means that we need
+						// to search for the next backend key, because the overlay has overwritten
+						// this key.
+						next_backend_key = self.backend.next_child_storage_key(
+							child_info,
+							&overlay_key.0,
+						).expect(EXT_NOT_ALLOWED_TO_FAIL);
+					}
+				}
 
-		match (next_backend_key, next_overlay_key_change) {
-			(Some(backend_key), Some(overlay_key)) if &backend_key[..] < overlay_key.0 => Some(backend_key),
-			(backend_key, None) => backend_key,
-			(_, Some(overlay_key)) => if overlay_key.1.value().is_some() {
-				Some(overlay_key.0.to_vec())
-			} else {
-				self.next_child_storage_key(
-					child_info,
-					&overlay_key.0[..],
-				)
+				next_backend_key
+			},
+			(None, Some(_)) => {
+				// Find the next overlay key that has a value attached.
+				overlay_changes.find_map(|k| k.1.value().as_ref().map(|_| k.0.to_vec()))
 			},
 		}
 	}
@@ -971,6 +1011,34 @@ mod tests {
 		assert_eq!(ext.next_storage_key(&[40]), Some(vec![50]));
 	}
 
+	#[test]
+	fn next_storage_key_works_with_a_lot_empty_values_in_overlay() {
+		let mut cache = StorageTransactionCache::default();
+		let mut overlay = OverlayedChanges::default();
+		overlay.set_storage(vec![20], None);
+		overlay.set_storage(vec![21], None);
+		overlay.set_storage(vec![22], None);
+		overlay.set_storage(vec![23], None);
+		overlay.set_storage(vec![24], None);
+		overlay.set_storage(vec![25], None);
+		overlay.set_storage(vec![26], None);
+		overlay.set_storage(vec![27], None);
+		overlay.set_storage(vec![28], None);
+		overlay.set_storage(vec![29], None);
+		let backend = Storage {
+			top: map![
+				vec![30] => vec![30]
+			],
+			children_default: map![]
+		}.into();
+
+		let ext = TestExt::new(&mut overlay, &mut cache, &backend, None, None);
+
+		assert_eq!(ext.next_storage_key(&[5]), Some(vec![30]));
+
+		drop(ext);
+	}
+
 	#[test]
 	fn next_child_storage_key_works() {
 		let child_info = ChildInfo::new_default(b"Child1");
diff --git a/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs b/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs
index d25f4807aa9..ae9584990e5 100644
--- a/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs
+++ b/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs
@@ -426,11 +426,11 @@ impl OverlayedChangeSet {
 		}
 	}
 
-	/// Get the change that is next to the supplied key.
-	pub fn next_change(&self, key: &[u8]) -> Option<(&[u8], &OverlayedValue)> {
+	/// Get the iterator over all changes that follow the supplied `key`.
+	pub fn changes_after(&self, key: &[u8]) -> impl Iterator<Item = (&[u8], &OverlayedValue)> {
 		use sp_std::ops::Bound;
 		let range = (Bound::Excluded(key), Bound::Unbounded);
-		self.changes.range::<[u8], _>(range).next().map(|(k, v)| (&k[..], v))
+		self.changes.range::<[u8], _>(range).map(|(k, v)| (k.as_slice(), v))
 	}
 }
 
@@ -707,29 +707,29 @@ mod test {
 		changeset.set(b"key4".to_vec(), Some(b"val4".to_vec()), Some(4));
 		changeset.set(b"key11".to_vec(), Some(b"val11".to_vec()), Some(11));
 
-		assert_eq!(changeset.next_change(b"key0").unwrap().0, b"key1");
-		assert_eq!(changeset.next_change(b"key0").unwrap().1.value(), Some(&b"val1".to_vec()));
-		assert_eq!(changeset.next_change(b"key1").unwrap().0, b"key11");
-		assert_eq!(changeset.next_change(b"key1").unwrap().1.value(), Some(&b"val11".to_vec()));
-		assert_eq!(changeset.next_change(b"key11").unwrap().0, b"key2");
-		assert_eq!(changeset.next_change(b"key11").unwrap().1.value(), Some(&b"val2".to_vec()));
-		assert_eq!(changeset.next_change(b"key2").unwrap().0, b"key3");
-		assert_eq!(changeset.next_change(b"key2").unwrap().1.value(), Some(&b"val3".to_vec()));
-		assert_eq!(changeset.next_change(b"key3").unwrap().0, b"key4");
-		assert_eq!(changeset.next_change(b"key3").unwrap().1.value(), Some(&b"val4".to_vec()));
-		assert_eq!(changeset.next_change(b"key4"), None);
+		assert_eq!(changeset.changes_after(b"key0").next().unwrap().0, b"key1");
+		assert_eq!(changeset.changes_after(b"key0").next().unwrap().1.value(), Some(&b"val1".to_vec()));
+		assert_eq!(changeset.changes_after(b"key1").next().unwrap().0, b"key11");
+		assert_eq!(changeset.changes_after(b"key1").next().unwrap().1.value(), Some(&b"val11".to_vec()));
+		assert_eq!(changeset.changes_after(b"key11").next().unwrap().0, b"key2");
+		assert_eq!(changeset.changes_after(b"key11").next().unwrap().1.value(), Some(&b"val2".to_vec()));
+		assert_eq!(changeset.changes_after(b"key2").next().unwrap().0, b"key3");
+		assert_eq!(changeset.changes_after(b"key2").next().unwrap().1.value(), Some(&b"val3".to_vec()));
+		assert_eq!(changeset.changes_after(b"key3").next().unwrap().0, b"key4");
+		assert_eq!(changeset.changes_after(b"key3").next().unwrap().1.value(), Some(&b"val4".to_vec()));
+		assert_eq!(changeset.changes_after(b"key4").next(), None);
 
 		changeset.rollback_transaction().unwrap();
 
-		assert_eq!(changeset.next_change(b"key0").unwrap().0, b"key1");
-		assert_eq!(changeset.next_change(b"key0").unwrap().1.value(), Some(&b"val1".to_vec()));
-		assert_eq!(changeset.next_change(b"key1").unwrap().0, b"key2");
-		assert_eq!(changeset.next_change(b"key1").unwrap().1.value(), Some(&b"val2".to_vec()));
-		assert_eq!(changeset.next_change(b"key11").unwrap().0, b"key2");
-		assert_eq!(changeset.next_change(b"key11").unwrap().1.value(), Some(&b"val2".to_vec()));
-		assert_eq!(changeset.next_change(b"key2"), None);
-		assert_eq!(changeset.next_change(b"key3"), None);
-		assert_eq!(changeset.next_change(b"key4"), None);
+		assert_eq!(changeset.changes_after(b"key0").next().unwrap().0, b"key1");
+		assert_eq!(changeset.changes_after(b"key0").next().unwrap().1.value(), Some(&b"val1".to_vec()));
+		assert_eq!(changeset.changes_after(b"key1").next().unwrap().0, b"key2");
+		assert_eq!(changeset.changes_after(b"key1").next().unwrap().1.value(), Some(&b"val2".to_vec()));
+		assert_eq!(changeset.changes_after(b"key11").next().unwrap().0, b"key2");
+		assert_eq!(changeset.changes_after(b"key11").next().unwrap().1.value(), Some(&b"val2".to_vec()));
+		assert_eq!(changeset.changes_after(b"key2").next(), None);
+		assert_eq!(changeset.changes_after(b"key3").next(), None);
+		assert_eq!(changeset.changes_after(b"key4").next(), None);
 
 	}
 
diff --git a/substrate/primitives/state-machine/src/overlayed_changes/mod.rs b/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
index 1d3cbb59ba0..2a3495a4e1c 100644
--- a/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
+++ b/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
@@ -669,24 +669,24 @@ impl OverlayedChanges {
 		})
 	}
 
-	/// Returns the next (in lexicographic order) storage key in the overlayed alongside its value.
-	/// If no value is next then `None` is returned.
-	pub fn next_storage_key_change(&self, key: &[u8]) -> Option<(&[u8], &OverlayedValue)> {
-		self.top.next_change(key)
+	/// Returns an iterator over the keys (in lexicographic order) following `key` (excluding `key`)
+	/// alongside its value.
+	pub fn iter_after(&self, key: &[u8]) -> impl Iterator<Item = (&[u8], &OverlayedValue)> {
+		self.top.changes_after(key)
 	}
 
-	/// Returns the next (in lexicographic order) child storage key in the overlayed alongside its
-	/// value.  If no value is next then `None` is returned.
-	pub fn next_child_storage_key_change(
+	/// Returns an iterator over the keys (in lexicographic order) following `key` (excluding `key`)
+	/// alongside its value for the given `storage_key` child.
+	pub fn child_iter_after(
 		&self,
 		storage_key: &[u8],
 		key: &[u8]
-	) -> Option<(&[u8], &OverlayedValue)> {
+	) -> impl Iterator<Item = (&[u8], &OverlayedValue)> {
 		self.children
 			.get(storage_key)
-			.and_then(|(overlay, _)|
-				overlay.next_change(key)
-			)
+			.map(|(overlay, _)| overlay.changes_after(key))
+			.into_iter()
+			.flatten()
 	}
 
 	/// Read only access ot offchain overlay.
@@ -988,28 +988,28 @@ mod tests {
 		overlay.set_storage(vec![30], None);
 
 		// next_prospective < next_committed
-		let next_to_5 = overlay.next_storage_key_change(&[5]).unwrap();
+		let next_to_5 = overlay.iter_after(&[5]).next().unwrap();
 		assert_eq!(next_to_5.0.to_vec(), vec![10]);
 		assert_eq!(next_to_5.1.value(), Some(&vec![10]));
 
 		// next_committed < next_prospective
-		let next_to_10 = overlay.next_storage_key_change(&[10]).unwrap();
+		let next_to_10 = overlay.iter_after(&[10]).next().unwrap();
 		assert_eq!(next_to_10.0.to_vec(), vec![20]);
 		assert_eq!(next_to_10.1.value(), Some(&vec![20]));
 
 		// next_committed == next_prospective
-		let next_to_20 = overlay.next_storage_key_change(&[20]).unwrap();
+		let next_to_20 = overlay.iter_after(&[20]).next().unwrap();
 		assert_eq!(next_to_20.0.to_vec(), vec![30]);
 		assert_eq!(next_to_20.1.value(), None);
 
 		// next_committed, no next_prospective
-		let next_to_30 = overlay.next_storage_key_change(&[30]).unwrap();
+		let next_to_30 = overlay.iter_after(&[30]).next().unwrap();
 		assert_eq!(next_to_30.0.to_vec(), vec![40]);
 		assert_eq!(next_to_30.1.value(), Some(&vec![40]));
 
 		overlay.set_storage(vec![50], Some(vec![50]));
 		// next_prospective, no next_committed
-		let next_to_40 = overlay.next_storage_key_change(&[40]).unwrap();
+		let next_to_40 = overlay.iter_after(&[40]).next().unwrap();
 		assert_eq!(next_to_40.0.to_vec(), vec![50]);
 		assert_eq!(next_to_40.1.value(), Some(&vec![50]));
 	}
@@ -1029,28 +1029,28 @@ mod tests {
 		overlay.set_child_storage(child_info, vec![30], None);
 
 		// next_prospective < next_committed
-		let next_to_5 = overlay.next_child_storage_key_change(child, &[5]).unwrap();
+		let next_to_5 = overlay.child_iter_after(child, &[5]).next().unwrap();
 		assert_eq!(next_to_5.0.to_vec(), vec![10]);
 		assert_eq!(next_to_5.1.value(), Some(&vec![10]));
 
 		// next_committed < next_prospective
-		let next_to_10 = overlay.next_child_storage_key_change(child, &[10]).unwrap();
+		let next_to_10 = overlay.child_iter_after(child, &[10]).next().unwrap();
 		assert_eq!(next_to_10.0.to_vec(), vec![20]);
 		assert_eq!(next_to_10.1.value(), Some(&vec![20]));
 
 		// next_committed == next_prospective
-		let next_to_20 = overlay.next_child_storage_key_change(child, &[20]).unwrap();
+		let next_to_20 = overlay.child_iter_after(child, &[20]).next().unwrap();
 		assert_eq!(next_to_20.0.to_vec(), vec![30]);
 		assert_eq!(next_to_20.1.value(), None);
 
 		// next_committed, no next_prospective
-		let next_to_30 = overlay.next_child_storage_key_change(child, &[30]).unwrap();
+		let next_to_30 = overlay.child_iter_after(child, &[30]).next().unwrap();
 		assert_eq!(next_to_30.0.to_vec(), vec![40]);
 		assert_eq!(next_to_30.1.value(), Some(&vec![40]));
 
 		overlay.set_child_storage(child_info, vec![50], Some(vec![50]));
 		// next_prospective, no next_committed
-		let next_to_40 = overlay.next_child_storage_key_change(child, &[40]).unwrap();
+		let next_to_40 = overlay.child_iter_after(child, &[40]).next().unwrap();
 		assert_eq!(next_to_40.0.to_vec(), vec![50]);
 		assert_eq!(next_to_40.1.value(), Some(&vec![50]));
 	}
-- 
GitLab