From ad8620922bd7c0477b25c7dfd6fc233641cb27ae Mon Sep 17 00:00:00 2001
From: cheme <emericchevalier.pro@gmail.com>
Date: Tue, 11 Jun 2024 22:15:05 +0000
Subject: [PATCH] Append overlay optimization. (#1223)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This branch propose to avoid clones in append by storing offset and size
in previous overlay depth.
That way on rollback we can just truncate and change size of existing
value.
To avoid copy it also means that :

- append on new overlay layer if there is an existing value: create a
new Append entry with previous offsets, and take memory of previous
overlay value.
- rollback on append: restore value by applying offsets and put it back
in previous overlay value
- commit on append: appended value overwrite previous value (is an empty
vec as the memory was taken). offsets of commited layer are dropped, if
there is offset in previous overlay layer they are maintained.
- set value (or remove) when append offsets are present: current
appended value is moved back to previous overlay value with offset
applied and current empty entry is overwrite (no offsets kept).

The modify mechanism is not needed anymore.
This branch lacks testing and break some existing genericity (bit of
duplicated code), but good to have to check direction.

Generally I am not sure if it is worth or we just should favor
differents directions (transients blob storage for instance), as the
current append mechanism is a bit tricky (having a variable length in
first position means we sometime need to insert in front of a vector).

Fix #30.

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: EgorPopelyaev <egor@parity.io>
Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Co-authored-by: joe petrowski <25483142+joepetrowski@users.noreply.github.com>
Co-authored-by: Liam Aharon <liam.aharon@hotmail.com>
Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com>
Co-authored-by: Branislav Kontur <bkontur@gmail.com>
Co-authored-by: Bastian Köcher <info@kchr.de>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
---
 Cargo.lock                                    |  15 +
 .../core/pvf/common/src/executor_interface.rs |  12 +-
 prdoc/pr_1223.prdoc                           |  13 +
 .../executor/src/integration_tests/mod.rs     |   8 +-
 substrate/primitives/externalities/src/lib.rs |  16 +-
 substrate/primitives/io/src/lib.rs            |  12 +-
 substrate/primitives/state-machine/Cargo.toml |   3 +
 .../primitives/state-machine/fuzz/Cargo.toml  |  30 +
 .../fuzz/fuzz_targets/fuzz_append.rs          |  26 +
 .../primitives/state-machine/src/basic.rs     |  57 +-
 substrate/primitives/state-machine/src/ext.rs |  71 +-
 .../primitives/state-machine/src/fuzzing.rs   | 319 ++++++++
 .../state-machine/src/in_memory_backend.rs    |   1 +
 substrate/primitives/state-machine/src/lib.rs |  79 +-
 .../src/overlayed_changes/changeset.rs        | 744 ++++++++++++++++--
 .../src/overlayed_changes/mod.rs              | 130 +--
 .../src/overlayed_changes/offchain.rs         |  14 +-
 .../primitives/state-machine/src/read_only.rs |  12 +-
 .../primitives/state-machine/src/testing.rs   |  16 +-
 .../frame/remote-externalities/src/lib.rs     |   4 +-
 20 files changed, 1352 insertions(+), 230 deletions(-)
 create mode 100644 prdoc/pr_1223.prdoc
 create mode 100644 substrate/primitives/state-machine/fuzz/Cargo.toml
 create mode 100644 substrate/primitives/state-machine/fuzz/fuzz_targets/fuzz_append.rs
 create mode 100644 substrate/primitives/state-machine/src/fuzzing.rs

diff --git a/Cargo.lock b/Cargo.lock
index d2b7a47f84c..fba768c653c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -294,6 +294,9 @@ name = "arbitrary"
 version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7d5a26814d8dcb93b0e5a0ff3c6d80a8843bafb21b39e8e18a6f05471870e110"
+dependencies = [
+ "derive_arbitrary",
+]
 
 [[package]]
 name = "ark-bls12-377"
@@ -4729,6 +4732,17 @@ dependencies = [
  "syn 2.0.61",
 ]
 
+[[package]]
+name = "derive_arbitrary"
+version = "1.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611"
+dependencies = [
+ "proc-macro2 1.0.82",
+ "quote 1.0.35",
+ "syn 2.0.61",
+]
+
 [[package]]
 name = "derive_more"
 version = "0.99.17"
@@ -20296,6 +20310,7 @@ dependencies = [
 name = "sp-state-machine"
 version = "0.35.0"
 dependencies = [
+ "arbitrary",
  "array-bytes",
  "assert_matches",
  "hash-db",
diff --git a/polkadot/node/core/pvf/common/src/executor_interface.rs b/polkadot/node/core/pvf/common/src/executor_interface.rs
index 87491e70c5f..47f9ed1604e 100644
--- a/polkadot/node/core/pvf/common/src/executor_interface.rs
+++ b/polkadot/node/core/pvf/common/src/executor_interface.rs
@@ -215,19 +215,19 @@ type HostFunctions = (
 struct ValidationExternalities(sp_externalities::Extensions);
 
 impl sp_externalities::Externalities for ValidationExternalities {
-	fn storage(&self, _: &[u8]) -> Option<Vec<u8>> {
+	fn storage(&mut self, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("storage: unsupported feature for parachain validation")
 	}
 
-	fn storage_hash(&self, _: &[u8]) -> Option<Vec<u8>> {
+	fn storage_hash(&mut self, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("storage_hash: unsupported feature for parachain validation")
 	}
 
-	fn child_storage_hash(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
+	fn child_storage_hash(&mut self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("child_storage_hash: unsupported feature for parachain validation")
 	}
 
-	fn child_storage(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
+	fn child_storage(&mut self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("child_storage: unsupported feature for parachain validation")
 	}
 
@@ -275,11 +275,11 @@ impl sp_externalities::Externalities for ValidationExternalities {
 		panic!("child_storage_root: unsupported feature for parachain validation")
 	}
 
-	fn next_child_storage_key(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
+	fn next_child_storage_key(&mut self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("next_child_storage_key: unsupported feature for parachain validation")
 	}
 
-	fn next_storage_key(&self, _: &[u8]) -> Option<Vec<u8>> {
+	fn next_storage_key(&mut self, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("next_storage_key: unsupported feature for parachain validation")
 	}
 
diff --git a/prdoc/pr_1223.prdoc b/prdoc/pr_1223.prdoc
new file mode 100644
index 00000000000..08b18557b70
--- /dev/null
+++ b/prdoc/pr_1223.prdoc
@@ -0,0 +1,13 @@
+title: Optimize storage append operation
+
+doc:
+  - audience: [Node Dev, Runtime Dev]
+    description: |
+      This pull request optimizes the storage append operation in the `OverlayedChanges`.
+      Before the internal buffer was cloned every time a new transaction was created. Cloning
+      the internal buffer is now only done when there is no other possibility. This should
+      improve the performance in situations like when depositing events from batched calls.
+
+crates:
+  - name: sp-state-machine
+    bump: major
diff --git a/substrate/client/executor/src/integration_tests/mod.rs b/substrate/client/executor/src/integration_tests/mod.rs
index 7f91b3ffe76..5d94ec6dcd3 100644
--- a/substrate/client/executor/src/integration_tests/mod.rs
+++ b/substrate/client/executor/src/integration_tests/mod.rs
@@ -178,7 +178,7 @@ fn storage_should_work(wasm_method: WasmExecutionMethod) {
 		assert_eq!(output, b"all ok!".to_vec().encode());
 	}
 
-	let expected = TestExternalities::new(sp_core::storage::Storage {
+	let mut expected = TestExternalities::new(sp_core::storage::Storage {
 		top: map![
 			b"input".to_vec() => value,
 			b"foo".to_vec() => b"bar".to_vec(),
@@ -186,7 +186,7 @@ fn storage_should_work(wasm_method: WasmExecutionMethod) {
 		],
 		children_default: map![],
 	});
-	assert_eq!(ext, expected);
+	assert!(ext.eq(&mut expected));
 }
 
 test_wasm_execution!(clear_prefix_should_work);
@@ -208,7 +208,7 @@ fn clear_prefix_should_work(wasm_method: WasmExecutionMethod) {
 		assert_eq!(output, b"all ok!".to_vec().encode());
 	}
 
-	let expected = TestExternalities::new(sp_core::storage::Storage {
+	let mut expected = TestExternalities::new(sp_core::storage::Storage {
 		top: map![
 			b"aaa".to_vec() => b"1".to_vec(),
 			b"aab".to_vec() => b"2".to_vec(),
@@ -216,7 +216,7 @@ fn clear_prefix_should_work(wasm_method: WasmExecutionMethod) {
 		],
 		children_default: map![],
 	});
-	assert_eq!(expected, ext);
+	assert!(expected.eq(&mut ext));
 }
 
 test_wasm_execution!(blake2_256_should_work);
diff --git a/substrate/primitives/externalities/src/lib.rs b/substrate/primitives/externalities/src/lib.rs
index 142200f614a..bcc46ee4f1b 100644
--- a/substrate/primitives/externalities/src/lib.rs
+++ b/substrate/primitives/externalities/src/lib.rs
@@ -83,24 +83,24 @@ pub trait Externalities: ExtensionStore {
 	fn set_offchain_storage(&mut self, key: &[u8], value: Option<&[u8]>);
 
 	/// Read runtime storage.
-	fn storage(&self, key: &[u8]) -> Option<Vec<u8>>;
+	fn storage(&mut self, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Get storage value hash.
 	///
 	/// This may be optimized for large values.
-	fn storage_hash(&self, key: &[u8]) -> Option<Vec<u8>>;
+	fn storage_hash(&mut self, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Get child storage value hash.
 	///
 	/// This may be optimized for large values.
 	///
 	/// Returns an `Option` that holds the SCALE encoded hash.
-	fn child_storage_hash(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
+	fn child_storage_hash(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Read child runtime storage.
 	///
 	/// Returns an `Option` that holds the SCALE encoded hash.
-	fn child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
+	fn child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Set storage entry `key` of current contract being called (effective immediately).
 	fn set_storage(&mut self, key: Vec<u8>, value: Vec<u8>) {
@@ -124,20 +124,20 @@ pub trait Externalities: ExtensionStore {
 	}
 
 	/// Whether a storage entry exists.
-	fn exists_storage(&self, key: &[u8]) -> bool {
+	fn exists_storage(&mut self, key: &[u8]) -> bool {
 		self.storage(key).is_some()
 	}
 
 	/// Whether a child storage entry exists.
-	fn exists_child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> bool {
+	fn exists_child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> bool {
 		self.child_storage(child_info, key).is_some()
 	}
 
 	/// Returns the key immediately following the given key, if it exists.
-	fn next_storage_key(&self, key: &[u8]) -> Option<Vec<u8>>;
+	fn next_storage_key(&mut self, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Returns the key immediately following the given key, if it exists, in child storage.
-	fn next_child_storage_key(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
+	fn next_child_storage_key(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Clear an entire child storage.
 	///
diff --git a/substrate/primitives/io/src/lib.rs b/substrate/primitives/io/src/lib.rs
index c8675a9a90b..8ef1f41ce01 100644
--- a/substrate/primitives/io/src/lib.rs
+++ b/substrate/primitives/io/src/lib.rs
@@ -181,7 +181,7 @@ impl From<MultiRemovalResults> for KillStorageResult {
 #[runtime_interface]
 pub trait Storage {
 	/// Returns the data for `key` in the storage or `None` if the key can not be found.
-	fn get(&self, key: &[u8]) -> Option<bytes::Bytes> {
+	fn get(&mut self, key: &[u8]) -> Option<bytes::Bytes> {
 		self.storage(key).map(bytes::Bytes::from)
 	}
 
@@ -190,7 +190,7 @@ pub trait Storage {
 	/// doesn't exist at all.
 	/// If `value_out` length is smaller than the returned length, only `value_out` length bytes
 	/// are copied into `value_out`.
-	fn read(&self, key: &[u8], value_out: &mut [u8], value_offset: u32) -> Option<u32> {
+	fn read(&mut self, key: &[u8], value_out: &mut [u8], value_offset: u32) -> Option<u32> {
 		self.storage(key).map(|value| {
 			let value_offset = value_offset as usize;
 			let data = &value[value_offset.min(value.len())..];
@@ -211,7 +211,7 @@ pub trait Storage {
 	}
 
 	/// Check whether the given `key` exists in storage.
-	fn exists(&self, key: &[u8]) -> bool {
+	fn exists(&mut self, key: &[u8]) -> bool {
 		self.exists_storage(key)
 	}
 
@@ -387,7 +387,7 @@ pub trait DefaultChildStorage {
 	///
 	/// Parameter `storage_key` is the unprefixed location of the root of the child trie in the
 	/// parent trie. Result is `None` if the value for `key` in the child storage can not be found.
-	fn get(&self, storage_key: &[u8], key: &[u8]) -> Option<Vec<u8>> {
+	fn get(&mut self, storage_key: &[u8], key: &[u8]) -> Option<Vec<u8>> {
 		let child_info = ChildInfo::new_default(storage_key);
 		self.child_storage(&child_info, key).map(|s| s.to_vec())
 	}
@@ -400,7 +400,7 @@ pub trait DefaultChildStorage {
 	/// If `value_out` length is smaller than the returned length, only `value_out` length bytes
 	/// are copied into `value_out`.
 	fn read(
-		&self,
+		&mut self,
 		storage_key: &[u8],
 		key: &[u8],
 		value_out: &mut [u8],
@@ -478,7 +478,7 @@ pub trait DefaultChildStorage {
 	/// Check a child storage key.
 	///
 	/// Check whether the given `key` exists in default child defined at `storage_key`.
-	fn exists(&self, storage_key: &[u8], key: &[u8]) -> bool {
+	fn exists(&mut self, storage_key: &[u8], key: &[u8]) -> bool {
 		let child_info = ChildInfo::new_default(storage_key);
 		self.exists_child_storage(&child_info, key)
 	}
diff --git a/substrate/primitives/state-machine/Cargo.toml b/substrate/primitives/state-machine/Cargo.toml
index c383a17cb00..f6402eccf0d 100644
--- a/substrate/primitives/state-machine/Cargo.toml
+++ b/substrate/primitives/state-machine/Cargo.toml
@@ -30,6 +30,7 @@ sp-externalities = { path = "../externalities", default-features = false }
 sp-panic-handler = { path = "../panic-handler", optional = true }
 sp-trie = { path = "../trie", default-features = false }
 trie-db = { version = "0.29.0", default-features = false }
+arbitrary = { version = "1", features = ["derive"], optional = true }
 
 [dev-dependencies]
 array-bytes = "6.2.2"
@@ -37,9 +38,11 @@ pretty_assertions = "1.2.1"
 rand = "0.8.5"
 sp-runtime = { path = "../runtime" }
 assert_matches = "1.5"
+arbitrary = { version = "1", features = ["derive"] }
 
 [features]
 default = ["std"]
+fuzzing = ["arbitrary"]
 std = [
 	"codec/std",
 	"hash-db/std",
diff --git a/substrate/primitives/state-machine/fuzz/Cargo.toml b/substrate/primitives/state-machine/fuzz/Cargo.toml
new file mode 100644
index 00000000000..416c00c34fd
--- /dev/null
+++ b/substrate/primitives/state-machine/fuzz/Cargo.toml
@@ -0,0 +1,30 @@
+[package]
+name = "sp-state-machine-fuzz"
+version = "0.0.0"
+publish = false
+license = "Apache-2.0"
+edition = "2021"
+
+[package.metadata]
+cargo-fuzz = true
+
+[dependencies]
+libfuzzer-sys = "0.4"
+sp-runtime = { path = "../../runtime" }
+
+[dependencies.sp-state-machine]
+path = ".."
+features = ["fuzzing"]
+
+# Prevent this from interfering with workspaces
+[workspace]
+members = ["."]
+
+[profile.release]
+debug = 1
+
+[[bin]]
+name = "fuzz_append"
+path = "fuzz_targets/fuzz_append.rs"
+test = false
+doc = false
diff --git a/substrate/primitives/state-machine/fuzz/fuzz_targets/fuzz_append.rs b/substrate/primitives/state-machine/fuzz/fuzz_targets/fuzz_append.rs
new file mode 100644
index 00000000000..44847f53565
--- /dev/null
+++ b/substrate/primitives/state-machine/fuzz/fuzz_targets/fuzz_append.rs
@@ -0,0 +1,26 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#![no_main]
+
+use libfuzzer_sys::fuzz_target;
+use sp_state_machine::fuzzing::{fuzz_append, FuzzAppendPayload};
+use sp_runtime::traits::BlakeTwo256;
+
+fuzz_target!(|data: FuzzAppendPayload| {
+	fuzz_append::<BlakeTwo256>(data);
+});
diff --git a/substrate/primitives/state-machine/src/basic.rs b/substrate/primitives/state-machine/src/basic.rs
index 8b6f746eaba..6201d60abab 100644
--- a/substrate/primitives/state-machine/src/basic.rs
+++ b/substrate/primitives/state-machine/src/basic.rs
@@ -59,16 +59,17 @@ impl BasicExternalities {
 	}
 
 	/// Consume self and returns inner storages
-	pub fn into_storages(self) -> Storage {
+	#[cfg(feature = "std")]
+	pub fn into_storages(mut self) -> Storage {
 		Storage {
 			top: self
 				.overlay
-				.changes()
+				.changes_mut()
 				.filter_map(|(k, v)| v.value().map(|v| (k.to_vec(), v.to_vec())))
 				.collect(),
 			children_default: self
 				.overlay
-				.children()
+				.children_mut()
 				.map(|(iter, i)| {
 					(
 						i.storage_key().to_vec(),
@@ -87,6 +88,7 @@ impl BasicExternalities {
 	/// Execute the given closure `f` with the externalities set and initialized with `storage`.
 	///
 	/// Returns the result of the closure and updates `storage` with all changes.
+	#[cfg(feature = "std")]
 	pub fn execute_with_storage<R>(
 		storage: &mut sp_core::storage::Storage,
 		f: impl FnOnce() -> R,
@@ -118,19 +120,37 @@ impl BasicExternalities {
 	}
 }
 
+#[cfg(test)]
 impl PartialEq for BasicExternalities {
-	fn eq(&self, other: &BasicExternalities) -> bool {
-		self.overlay.changes().map(|(k, v)| (k, v.value())).collect::<BTreeMap<_, _>>() ==
-			other.overlay.changes().map(|(k, v)| (k, v.value())).collect::<BTreeMap<_, _>>() &&
+	fn eq(&self, other: &Self) -> bool {
+		self.overlay
+			.changes()
+			.map(|(k, v)| (k, v.value_ref().materialize()))
+			.collect::<BTreeMap<_, _>>() ==
+			other
+				.overlay
+				.changes()
+				.map(|(k, v)| (k, v.value_ref().materialize()))
+				.collect::<BTreeMap<_, _>>() &&
 			self.overlay
 				.children()
-				.map(|(iter, i)| (i, iter.map(|(k, v)| (k, v.value())).collect::<BTreeMap<_, _>>()))
+				.map(|(iter, i)| {
+					(
+						i,
+						iter.map(|(k, v)| (k, v.value_ref().materialize()))
+							.collect::<BTreeMap<_, _>>(),
+					)
+				})
 				.collect::<BTreeMap<_, _>>() ==
 				other
 					.overlay
 					.children()
 					.map(|(iter, i)| {
-						(i, iter.map(|(k, v)| (k, v.value())).collect::<BTreeMap<_, _>>())
+						(
+							i,
+							iter.map(|(k, v)| (k, v.value_ref().materialize()))
+								.collect::<BTreeMap<_, _>>(),
+						)
 					})
 					.collect::<BTreeMap<_, _>>()
 	}
@@ -159,27 +179,27 @@ impl From<BTreeMap<StorageKey, StorageValue>> for BasicExternalities {
 impl Externalities for BasicExternalities {
 	fn set_offchain_storage(&mut self, _key: &[u8], _value: Option<&[u8]>) {}
 
-	fn storage(&self, key: &[u8]) -> Option<StorageValue> {
+	fn storage(&mut self, key: &[u8]) -> Option<StorageValue> {
 		self.overlay.storage(key).and_then(|v| v.map(|v| v.to_vec()))
 	}
 
-	fn storage_hash(&self, key: &[u8]) -> Option<Vec<u8>> {
+	fn storage_hash(&mut self, key: &[u8]) -> Option<Vec<u8>> {
 		self.storage(key).map(|v| Blake2Hasher::hash(&v).encode())
 	}
 
-	fn child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
+	fn child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
 		self.overlay.child_storage(child_info, key).and_then(|v| v.map(|v| v.to_vec()))
 	}
 
-	fn child_storage_hash(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
+	fn child_storage_hash(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
 		self.child_storage(child_info, key).map(|v| Blake2Hasher::hash(&v).encode())
 	}
 
-	fn next_storage_key(&self, key: &[u8]) -> Option<StorageKey> {
+	fn next_storage_key(&mut self, key: &[u8]) -> Option<StorageKey> {
 		self.overlay.iter_after(key).find_map(|(k, v)| v.value().map(|_| k.to_vec()))
 	}
 
-	fn next_child_storage_key(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
+	fn next_child_storage_key(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
 		self.overlay
 			.child_iter_after(child_info.storage_key(), key)
 			.find_map(|(k, v)| v.value().map(|_| k.to_vec()))
@@ -243,15 +263,14 @@ impl Externalities for BasicExternalities {
 		MultiRemovalResults { maybe_cursor: None, backend: count, unique: count, loops: count }
 	}
 
-	fn storage_append(&mut self, key: Vec<u8>, value: Vec<u8>) {
-		let current_value = self.overlay.value_mut_or_insert_with(&key, || Default::default());
-		crate::ext::StorageAppend::new(current_value).append(value);
+	fn storage_append(&mut self, key: Vec<u8>, element: Vec<u8>) {
+		self.overlay.append_storage(key, element, Default::default);
 	}
 
 	fn storage_root(&mut self, state_version: StateVersion) -> Vec<u8> {
 		let mut top = self
 			.overlay
-			.changes()
+			.changes_mut()
 			.filter_map(|(k, v)| v.value().map(|v| (k.clone(), v.clone())))
 			.collect::<BTreeMap<_, _>>();
 		// Single child trie implementation currently allows using the same child
@@ -278,7 +297,7 @@ impl Externalities for BasicExternalities {
 		child_info: &ChildInfo,
 		state_version: StateVersion,
 	) -> Vec<u8> {
-		if let Some((data, child_info)) = self.overlay.child_changes(child_info.storage_key()) {
+		if let Some((data, child_info)) = self.overlay.child_changes_mut(child_info.storage_key()) {
 			let delta =
 				data.into_iter().map(|(k, v)| (k.as_ref(), v.value().map(|v| v.as_slice())));
 			crate::in_memory_backend::new_in_mem::<Blake2Hasher>()
diff --git a/substrate/primitives/state-machine/src/ext.rs b/substrate/primitives/state-machine/src/ext.rs
index 9aa32bc866c..7a79c4e8a1f 100644
--- a/substrate/primitives/state-machine/src/ext.rs
+++ b/substrate/primitives/state-machine/src/ext.rs
@@ -22,7 +22,7 @@ use crate::overlayed_changes::OverlayedExtensions;
 use crate::{
 	backend::Backend, IndexOperation, IterArgs, OverlayedChanges, StorageKey, StorageValue,
 };
-use codec::{Encode, EncodeAppend};
+use codec::{Compact, CompactLen, Decode, Encode};
 use hash_db::Hasher;
 #[cfg(feature = "std")]
 use sp_core::hexdisplay::HexDisplay;
@@ -31,8 +31,8 @@ use sp_core::storage::{
 };
 use sp_externalities::{Extension, ExtensionStore, Externalities, MultiRemovalResults};
 
-use crate::{log_error, trace, warn};
-use alloc::{boxed::Box, vec, vec::Vec};
+use crate::{trace, warn};
+use alloc::{boxed::Box, vec::Vec};
 use core::{
 	any::{Any, TypeId},
 	cmp::Ordering,
@@ -139,7 +139,7 @@ where
 	H::Out: Ord + 'static,
 	B: 'a + Backend<H>,
 {
-	pub fn storage_pairs(&self) -> Vec<(StorageKey, StorageValue)> {
+	pub fn storage_pairs(&mut self) -> Vec<(StorageKey, StorageValue)> {
 		use std::collections::HashMap;
 
 		self.backend
@@ -147,7 +147,7 @@ where
 			.expect("never fails in tests; qed.")
 			.map(|key_value| key_value.expect("never fails in tests; qed."))
 			.map(|(k, v)| (k, Some(v)))
-			.chain(self.overlay.changes().map(|(k, v)| (k.clone(), v.value().cloned())))
+			.chain(self.overlay.changes_mut().map(|(k, v)| (k.clone(), v.value().cloned())))
 			.collect::<HashMap<_, _>>()
 			.into_iter()
 			.filter_map(|(k, maybe_val)| maybe_val.map(|val| (k, val)))
@@ -165,7 +165,7 @@ where
 		self.overlay.set_offchain_storage(key, value)
 	}
 
-	fn storage(&self, key: &[u8]) -> Option<StorageValue> {
+	fn storage(&mut self, key: &[u8]) -> Option<StorageValue> {
 		let _guard = guard();
 		let result = self
 			.overlay
@@ -191,7 +191,7 @@ where
 		result
 	}
 
-	fn storage_hash(&self, key: &[u8]) -> Option<Vec<u8>> {
+	fn storage_hash(&mut self, key: &[u8]) -> Option<Vec<u8>> {
 		let _guard = guard();
 		let result = self
 			.overlay
@@ -209,7 +209,7 @@ where
 		result.map(|r| r.encode())
 	}
 
-	fn child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
+	fn child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
 		let _guard = guard();
 		let result = self
 			.overlay
@@ -231,7 +231,7 @@ where
 		result
 	}
 
-	fn child_storage_hash(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
+	fn child_storage_hash(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
 		let _guard = guard();
 		let result = self
 			.overlay
@@ -253,7 +253,7 @@ where
 		result.map(|r| r.encode())
 	}
 
-	fn exists_storage(&self, key: &[u8]) -> bool {
+	fn exists_storage(&mut self, key: &[u8]) -> bool {
 		let _guard = guard();
 		let result = match self.overlay.storage(key) {
 			Some(x) => x.is_some(),
@@ -271,7 +271,7 @@ where
 		result
 	}
 
-	fn exists_child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> bool {
+	fn exists_child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> bool {
 		let _guard = guard();
 
 		let result = match self.overlay.child_storage(child_info, key) {
@@ -293,7 +293,7 @@ where
 		result
 	}
 
-	fn next_storage_key(&self, key: &[u8]) -> Option<StorageKey> {
+	fn next_storage_key(&mut self, key: &[u8]) -> Option<StorageKey> {
 		let mut next_backend_key =
 			self.backend.next_storage_key(key).expect(EXT_NOT_ALLOWED_TO_FAIL);
 		let mut overlay_changes = self.overlay.iter_after(key).peekable();
@@ -331,7 +331,7 @@ where
 		}
 	}
 
-	fn next_child_storage_key(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
+	fn next_child_storage_key(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
 		let mut next_backend_key = self
 			.backend
 			.next_child_storage_key(child_info, key)
@@ -501,10 +501,9 @@ where
 		let _guard = guard();
 
 		let backend = &mut self.backend;
-		let current_value = self.overlay.value_mut_or_insert_with(&key, || {
+		self.overlay.append_storage(key.clone(), value, || {
 			backend.storage(&key).expect(EXT_NOT_ALLOWED_TO_FAIL).unwrap_or_default()
 		});
-		StorageAppend::new(current_value).append(value);
 	}
 
 	fn storage_root(&mut self, state_version: StateVersion) -> Vec<u8> {
@@ -731,10 +730,27 @@ impl<'a> StorageAppend<'a> {
 		Self(storage)
 	}
 
+	/// Extract the length of the list like data structure.
+	pub fn extract_length(&self) -> Option<u32> {
+		Compact::<u32>::decode(&mut &self.0[..]).map(|c| c.0).ok()
+	}
+
+	/// Replace the length in the encoded data.
+	///
+	/// If `old_length` is `None`, the previous length will be assumed to be `0`.
+	pub fn replace_length(&mut self, old_length: Option<u32>, new_length: u32) {
+		let old_len_encoded_len = old_length.map(|l| Compact::<u32>::compact_len(&l)).unwrap_or(0);
+		let new_len_encoded = Compact::<u32>(new_length).encode();
+		self.0.splice(0..old_len_encoded_len, new_len_encoded);
+	}
+
 	/// Append the given `value` to the storage item.
 	///
-	/// If appending fails, `[value]` is stored in the storage item.
-	pub fn append(&mut self, value: Vec<u8>) {
+	/// If appending fails, `[value]` is stored in the storage item and we return false.
+	#[cfg(any(test, feature = "fuzzing"))]
+	pub fn append(&mut self, value: Vec<u8>) -> bool {
+		use codec::EncodeAppend;
+		let mut result = true;
 		let value = vec![EncodeOpaqueValue(value)];
 
 		let item = core::mem::take(self.0);
@@ -742,13 +758,20 @@ impl<'a> StorageAppend<'a> {
 		*self.0 = match Vec::<EncodeOpaqueValue>::append_or_new(item, &value) {
 			Ok(item) => item,
 			Err(_) => {
-				log_error!(
+				result = false;
+				crate::log_error!(
 					target: "runtime",
 					"Failed to append value, resetting storage item to `[value]`.",
 				);
 				value.encode()
 			},
 		};
+		result
+	}
+
+	/// Append to current buffer, do not touch the prefixed length.
+	pub fn append_raw(&mut self, mut value: Vec<u8>) {
+		self.0.append(&mut value)
 	}
 }
 
@@ -849,7 +872,7 @@ mod tests {
 		)
 			.into();
 
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		// next_backend < next_overlay
 		assert_eq!(ext.next_storage_key(&[5]), Some(vec![10]));
@@ -865,7 +888,7 @@ mod tests {
 
 		drop(ext);
 		overlay.set_storage(vec![50], Some(vec![50]));
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		// next_overlay exist but next_backend doesn't exist
 		assert_eq!(ext.next_storage_key(&[40]), Some(vec![50]));
@@ -895,7 +918,7 @@ mod tests {
 		)
 			.into();
 
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		assert_eq!(ext.next_storage_key(&[5]), Some(vec![30]));
 
@@ -928,7 +951,7 @@ mod tests {
 		)
 			.into();
 
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		// next_backend < next_overlay
 		assert_eq!(ext.next_child_storage_key(child_info, &[5]), Some(vec![10]));
@@ -944,7 +967,7 @@ mod tests {
 
 		drop(ext);
 		overlay.set_child_storage(child_info, vec![50], Some(vec![50]));
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		// next_overlay exist but next_backend doesn't exist
 		assert_eq!(ext.next_child_storage_key(child_info, &[40]), Some(vec![50]));
@@ -975,7 +998,7 @@ mod tests {
 		)
 			.into();
 
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		assert_eq!(ext.child_storage(child_info, &[10]), Some(vec![10]));
 		assert_eq!(
diff --git a/substrate/primitives/state-machine/src/fuzzing.rs b/substrate/primitives/state-machine/src/fuzzing.rs
new file mode 100644
index 00000000000..e147e6e8800
--- /dev/null
+++ b/substrate/primitives/state-machine/src/fuzzing.rs
@@ -0,0 +1,319 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// 	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! State machine fuzzing implementation, behind `fuzzing` feature.
+
+use super::{ext::Ext, *};
+use crate::ext::StorageAppend;
+use arbitrary::Arbitrary;
+#[cfg(test)]
+use codec::Encode;
+use hash_db::Hasher;
+use sp_core::{storage::StateVersion, traits::Externalities};
+#[cfg(test)]
+use sp_runtime::traits::BlakeTwo256;
+use sp_trie::PrefixedMemoryDB;
+use std::collections::BTreeMap;
+
+#[derive(Arbitrary, Debug, Clone)]
+enum DataLength {
+	Zero = 0,
+	Small = 1,
+	Medium = 3,
+	Big = 300, // 2 byte scale encode length
+}
+
+#[derive(Arbitrary, Debug, Clone)]
+#[repr(u8)]
+enum DataValue {
+	A = b'a',
+	B = b'b',
+	C = b'c',
+	D = b'd',       // This can be read as a multiple byte compact length.
+	EasyBug = 20u8, // value compact len.
+}
+
+/// Action to fuzz
+#[derive(Arbitrary, Debug, Clone)]
+enum FuzzAppendItem {
+	Append(DataValue, DataLength),
+	Insert(DataValue, DataLength),
+	StartTransaction,
+	RollbackTransaction,
+	CommitTransaction,
+	Read,
+	Remove,
+	// To go over 256 items easily (different compact size then).
+	Append50(DataValue, DataLength),
+}
+
+/// Arbitrary payload for fuzzing append.
+#[derive(Arbitrary, Debug, Clone)]
+pub struct FuzzAppendPayload(Vec<FuzzAppendItem>, Option<(DataValue, DataLength)>);
+
+struct SimpleOverlay {
+	data: Vec<BTreeMap<Vec<u8>, Option<Vec<u8>>>>,
+}
+
+impl Default for SimpleOverlay {
+	fn default() -> Self {
+		Self { data: vec![BTreeMap::new()] }
+	}
+}
+
+impl SimpleOverlay {
+	fn insert(&mut self, key: Vec<u8>, value: Option<Vec<u8>>) {
+		self.data.last_mut().expect("always at least one item").insert(key, value);
+	}
+
+	fn append<H>(
+		&mut self,
+		key: Vec<u8>,
+		value: Vec<u8>,
+		backend: &mut TrieBackend<PrefixedMemoryDB<H>, H>,
+	) where
+		H: Hasher,
+		H::Out: codec::Decode + codec::Encode + 'static,
+	{
+		let current_value = self
+			.data
+			.last_mut()
+			.expect("always at least one item")
+			.entry(key.clone())
+			.or_insert_with(|| {
+				Some(backend.storage(&key).expect("Ext not allowed to fail").unwrap_or_default())
+			});
+		if current_value.is_none() {
+			*current_value = Some(vec![]);
+		}
+		StorageAppend::new(current_value.as_mut().expect("init above")).append(value);
+	}
+
+	fn get(&mut self, key: &[u8]) -> Option<&Vec<u8>> {
+		self.data
+			.last_mut()
+			.expect("always at least one item")
+			.get(key)
+			.and_then(|o| o.as_ref())
+	}
+
+	fn commit_transaction(&mut self) {
+		if let Some(to_commit) = self.data.pop() {
+			let dest = self.data.last_mut().expect("always at least one item");
+			for (k, v) in to_commit.into_iter() {
+				dest.insert(k, v);
+			}
+		}
+	}
+
+	fn rollback_transaction(&mut self) {
+		let _ = self.data.pop();
+	}
+
+	fn start_transaction(&mut self) {
+		let cloned = self.data.last().expect("always at least one item").clone();
+		self.data.push(cloned);
+	}
+}
+
+struct FuzzAppendState<H: Hasher> {
+	key: Vec<u8>,
+
+	// reference simple implementation
+	reference: SimpleOverlay,
+
+	// trie backend
+	backend: TrieBackend<PrefixedMemoryDB<H>, H>,
+	// Standard Overlay
+	overlay: OverlayedChanges<H>,
+
+	// block dropping/commiting too many transaction
+	transaction_depth: usize,
+}
+
+impl<H> FuzzAppendState<H>
+where
+	H: Hasher,
+	H::Out: codec::Decode + codec::Encode + 'static,
+{
+	fn process_item(&mut self, item: FuzzAppendItem) {
+		let mut ext = Ext::new(&mut self.overlay, &mut self.backend, None);
+		match item {
+			FuzzAppendItem::Append(value, length) => {
+				let value = vec![value as u8; length as usize];
+				ext.storage_append(self.key.clone(), value.clone());
+				self.reference.append(self.key.clone(), value, &mut self.backend);
+			},
+			FuzzAppendItem::Append50(value, length) => {
+				let value = vec![value as u8; length as usize];
+				for _ in 0..50 {
+					let mut ext = Ext::new(&mut self.overlay, &mut self.backend, None);
+					ext.storage_append(self.key.clone(), value.clone());
+					self.reference.append(self.key.clone(), value.clone(), &mut self.backend);
+				}
+			},
+			FuzzAppendItem::Insert(value, length) => {
+				let value = vec![value as u8; length as usize];
+				ext.set_storage(self.key.clone(), value.clone());
+				self.reference.insert(self.key.clone(), Some(value));
+			},
+			FuzzAppendItem::Remove => {
+				ext.clear_storage(&self.key);
+				self.reference.insert(self.key.clone(), None);
+			},
+			FuzzAppendItem::Read => {
+				let left = ext.storage(self.key.as_slice());
+				let right = self.reference.get(self.key.as_slice());
+				assert_eq!(left.as_ref(), right);
+			},
+			FuzzAppendItem::StartTransaction => {
+				self.transaction_depth += 1;
+				self.reference.start_transaction();
+				ext.storage_start_transaction();
+			},
+			FuzzAppendItem::RollbackTransaction => {
+				if self.transaction_depth == 0 {
+					return
+				}
+				self.transaction_depth -= 1;
+				self.reference.rollback_transaction();
+				ext.storage_rollback_transaction().unwrap();
+			},
+			FuzzAppendItem::CommitTransaction => {
+				if self.transaction_depth == 0 {
+					return
+				}
+				self.transaction_depth -= 1;
+				self.reference.commit_transaction();
+				ext.storage_commit_transaction().unwrap();
+			},
+		}
+	}
+
+	fn check_final_state(&mut self) {
+		let mut ext = Ext::new(&mut self.overlay, &mut self.backend, None);
+		let left = ext.storage(self.key.as_slice());
+		let right = self.reference.get(self.key.as_slice());
+		assert_eq!(left.as_ref(), right);
+	}
+}
+
+#[test]
+fn fuzz_scenarii() {
+	assert_eq!(codec::Compact(5u16).encode()[0], DataValue::EasyBug as u8);
+	let scenarii = vec![
+		(
+			vec![
+				FuzzAppendItem::Append(DataValue::A, DataLength::Small),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append50(DataValue::D, DataLength::Small),
+				FuzzAppendItem::Read,
+				FuzzAppendItem::RollbackTransaction,
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::D, DataLength::Small),
+				FuzzAppendItem::Read,
+				FuzzAppendItem::RollbackTransaction,
+			],
+			Some((DataValue::D, DataLength::Small)),
+		),
+		(
+			vec![
+				FuzzAppendItem::Append(DataValue::B, DataLength::Small),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Small),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Remove,
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Zero),
+				FuzzAppendItem::CommitTransaction,
+				FuzzAppendItem::CommitTransaction,
+				FuzzAppendItem::Remove,
+			],
+			Some((DataValue::EasyBug, DataLength::Small)),
+		),
+		(
+			vec![
+				FuzzAppendItem::Append(DataValue::A, DataLength::Small),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Medium),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Remove,
+				FuzzAppendItem::CommitTransaction,
+				FuzzAppendItem::RollbackTransaction,
+			],
+			Some((DataValue::B, DataLength::Big)),
+		),
+		(
+			vec![
+				FuzzAppendItem::Append(DataValue::A, DataLength::Big),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Medium),
+				FuzzAppendItem::Remove,
+				FuzzAppendItem::RollbackTransaction,
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Zero),
+			],
+			None,
+		),
+		(
+			vec![
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::RollbackTransaction,
+				FuzzAppendItem::RollbackTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Zero),
+			],
+			None,
+		),
+		(vec![FuzzAppendItem::StartTransaction], Some((DataValue::EasyBug, DataLength::Zero))),
+	];
+
+	for (scenario, init) in scenarii.into_iter() {
+		fuzz_append::<BlakeTwo256>(FuzzAppendPayload(scenario, init));
+	}
+}
+
+/// Test append operation for a given fuzzing payload.
+pub fn fuzz_append<H>(payload: FuzzAppendPayload)
+where
+	H: Hasher,
+	H::Out: codec::Decode + codec::Encode + 'static,
+{
+	let FuzzAppendPayload(to_fuzz, initial) = payload;
+	let key = b"k".to_vec();
+	let mut reference = SimpleOverlay::default();
+	let initial: BTreeMap<_, _> = initial
+		.into_iter()
+		.map(|(v, l)| (key.clone(), vec![v as u8; l as usize]))
+		.collect();
+	for (k, v) in initial.iter() {
+		reference.data[0].insert(k.clone(), Some(v.clone()));
+	}
+	reference.start_transaction(); // level 0 is backend, keep it untouched.
+	let overlay = OverlayedChanges::default();
+
+	let mut state = FuzzAppendState::<H> {
+		key,
+		reference,
+		overlay,
+		backend: (initial, StateVersion::default()).into(),
+		transaction_depth: 0,
+	};
+	for item in to_fuzz {
+		state.process_item(item);
+	}
+	state.check_final_state();
+}
diff --git a/substrate/primitives/state-machine/src/in_memory_backend.rs b/substrate/primitives/state-machine/src/in_memory_backend.rs
index 06fe6d4162a..7ba7457a6bf 100644
--- a/substrate/primitives/state-machine/src/in_memory_backend.rs
+++ b/substrate/primitives/state-machine/src/in_memory_backend.rs
@@ -132,6 +132,7 @@ where
 	}
 }
 
+#[cfg(feature = "std")]
 impl<H: Hasher> From<(Storage, StateVersion)> for TrieBackend<PrefixedMemoryDB<H>, H>
 where
 	H::Out: Codec + Ord,
diff --git a/substrate/primitives/state-machine/src/lib.rs b/substrate/primitives/state-machine/src/lib.rs
index 13087431d38..289b08755f6 100644
--- a/substrate/primitives/state-machine/src/lib.rs
+++ b/substrate/primitives/state-machine/src/lib.rs
@@ -27,6 +27,8 @@ pub mod backend;
 mod basic;
 mod error;
 mod ext;
+#[cfg(feature = "fuzzing")]
+pub mod fuzzing;
 #[cfg(feature = "std")]
 mod in_memory_backend;
 pub(crate) mod overlayed_changes;
@@ -1273,7 +1275,7 @@ mod tests {
 
 		assert_eq!(
 			overlay
-				.changes()
+				.changes_mut()
 				.map(|(k, v)| (k.clone(), v.value().cloned()))
 				.collect::<HashMap<_, _>>(),
 			map![
@@ -1299,7 +1301,7 @@ mod tests {
 
 		assert_eq!(
 			overlay
-				.changes()
+				.changes_mut()
 				.map(|(k, v)| (k.clone(), v.value().cloned()))
 				.collect::<HashMap<_, _>>(),
 			map![
@@ -1340,7 +1342,7 @@ mod tests {
 
 		assert_eq!(
 			overlay
-				.children()
+				.children_mut()
 				.flat_map(|(iter, _child_info)| iter)
 				.map(|(k, v)| (k.clone(), v.value()))
 				.collect::<BTreeMap<_, _>>(),
@@ -1440,11 +1442,78 @@ mod tests {
 		}
 		overlay.rollback_transaction().unwrap();
 		{
-			let ext = Ext::new(&mut overlay, backend, None);
+			let mut ext = Ext::new(&mut overlay, backend, None);
 			assert_eq!(ext.storage(key.as_slice()), Some(vec![reference_data[0].clone()].encode()));
 		}
 	}
 
+	// Test that we can append twice to a key, then perform a remove operation.
+	// The test checks specifically that the append is merged with its parent transaction
+	// on commit.
+	#[test]
+	fn commit_merges_append_with_parent() {
+		#[derive(codec::Encode, codec::Decode)]
+		enum Item {
+			Item1,
+			Item2,
+		}
+
+		let key = b"events".to_vec();
+		let state = new_in_mem::<BlakeTwo256>();
+		let backend = state.as_trie_backend();
+		let mut overlay = OverlayedChanges::default();
+
+		// Append first item
+		overlay.start_transaction();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+			ext.clear_storage(key.as_slice());
+			ext.storage_append(key.clone(), Item::Item1.encode());
+		}
+
+		// Append second item
+		overlay.start_transaction();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+
+			assert_eq!(ext.storage(key.as_slice()), Some(vec![Item::Item1].encode()));
+
+			ext.storage_append(key.clone(), Item::Item2.encode());
+
+			assert_eq!(ext.storage(key.as_slice()), Some(vec![Item::Item1, Item::Item2].encode()),);
+		}
+
+		// Remove item
+		overlay.start_transaction();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+
+			ext.place_storage(key.clone(), None);
+
+			assert_eq!(ext.storage(key.as_slice()), None);
+		}
+
+		// Remove gets commited and merged into previous transaction
+		overlay.commit_transaction().unwrap();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+			assert_eq!(ext.storage(key.as_slice()), None,);
+		}
+
+		// Remove gets rolled back, we should see the initial append again.
+		overlay.rollback_transaction().unwrap();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+			assert_eq!(ext.storage(key.as_slice()), Some(vec![Item::Item1].encode()));
+		}
+
+		overlay.commit_transaction().unwrap();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+			assert_eq!(ext.storage(key.as_slice()), Some(vec![Item::Item1].encode()));
+		}
+	}
+
 	#[test]
 	fn remove_with_append_then_rollback_appended_then_append_again() {
 		#[derive(codec::Encode, codec::Decode)]
@@ -1499,7 +1568,7 @@ mod tests {
 
 		// Then only initialization item and second (committed) item should persist.
 		{
-			let ext = Ext::new(&mut overlay, backend, None);
+			let mut ext = Ext::new(&mut overlay, backend, None);
 			assert_eq!(
 				ext.storage(key.as_slice()),
 				Some(vec![Item::InitializationItem, Item::CommittedItem].encode()),
diff --git a/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs b/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs
index 601bc2e2919..c478983e979 100644
--- a/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs
+++ b/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs
@@ -21,11 +21,15 @@ use super::{Extrinsics, StorageKey, StorageValue};
 
 #[cfg(not(feature = "std"))]
 use alloc::collections::btree_set::BTreeSet as Set;
+use codec::{Compact, CompactLen};
 #[cfg(feature = "std")]
 use std::collections::HashSet as Set;
 
-use crate::warn;
-use alloc::collections::{btree_map::BTreeMap, btree_set::BTreeSet};
+use crate::{ext::StorageAppend, warn};
+use alloc::{
+	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
+	vec::Vec,
+};
 use core::hash::Hash;
 use smallvec::SmallVec;
 
@@ -86,10 +90,97 @@ impl<V> Default for OverlayedEntry<V> {
 }
 
 /// History of value, with removal support.
-pub type OverlayedValue = OverlayedEntry<Option<StorageValue>>;
+pub type OverlayedValue = OverlayedEntry<StorageEntry>;
+
+/// Content in an overlay for a given transactional depth.
+#[derive(Debug, Clone, Default)]
+#[cfg_attr(test, derive(PartialEq))]
+pub enum StorageEntry {
+	/// The storage entry should be set to the stored value.
+	Set(StorageValue),
+	/// The storage entry should be removed.
+	#[default]
+	Remove,
+	/// The storage entry was appended to.
+	///
+	/// This assumes that the storage entry is encoded as a SCALE list. This means that it is
+	/// prefixed with a `Compact<u32>` that reprensents the length, followed by all the encoded
+	/// elements.
+	Append {
+		/// The value of the storage entry.
+		///
+		/// This may or may not be prefixed by the length, depending on the materialized length.
+		data: StorageValue,
+		/// Current number of elements stored in data.
+		current_length: u32,
+		/// The number of elements as stored in the prefixed length in `data`.
+		///
+		/// If `None`, than `data` is not yet prefixed with the length.
+		materialized_length: Option<u32>,
+		/// The size of `data` in the parent transactional layer.
+		///
+		/// Only set when the parent layer is in  `Append` state.
+		parent_size: Option<usize>,
+	},
+}
+
+impl StorageEntry {
+	/// Convert to an [`Option<StorageValue>`].
+	pub(super) fn to_option(mut self) -> Option<StorageValue> {
+		self.materialize_in_place();
+		match self {
+			StorageEntry::Append { data, .. } | StorageEntry::Set(data) => Some(data),
+			StorageEntry::Remove => None,
+		}
+	}
+
+	/// Return as an [`Option<StorageValue>`].
+	fn as_option(&mut self) -> Option<&StorageValue> {
+		self.materialize_in_place();
+		match self {
+			StorageEntry::Append { data, .. } | StorageEntry::Set(data) => Some(data),
+			StorageEntry::Remove => None,
+		}
+	}
+
+	/// Materialize the internal state and cache the resulting materialized value.
+	fn materialize_in_place(&mut self) {
+		if let StorageEntry::Append { data, materialized_length, current_length, .. } = self {
+			let current_length = *current_length;
+			if materialized_length.map_or(false, |m| m == current_length) {
+				return
+			}
+			StorageAppend::new(data).replace_length(*materialized_length, current_length);
+			*materialized_length = Some(current_length);
+		}
+	}
+
+	/// Materialize the internal state.
+	#[cfg(test)]
+	pub(crate) fn materialize(&self) -> Option<alloc::borrow::Cow<[u8]>> {
+		use alloc::borrow::Cow;
+
+		match self {
+			StorageEntry::Append { data, materialized_length, current_length, .. } => {
+				let current_length = *current_length;
+				if materialized_length.map_or(false, |m| m == current_length) {
+					Some(Cow::Borrowed(data.as_ref()))
+				} else {
+					let mut data = data.clone();
+					StorageAppend::new(&mut data)
+						.replace_length(*materialized_length, current_length);
+
+					Some(data.into())
+				}
+			},
+			StorageEntry::Remove => None,
+			StorageEntry::Set(e) => Some(Cow::Borrowed(e.as_ref())),
+		}
+	}
+}
 
 /// Change set for basic key value with extrinsics index recording and removal support.
-pub type OverlayedChangeSet = OverlayedMap<StorageKey, Option<StorageValue>>;
+pub type OverlayedChangeSet = OverlayedMap<StorageKey, StorageEntry>;
 
 /// Holds a set of changes with the ability modify them using nested transactions.
 #[derive(Debug, Clone)]
@@ -120,7 +211,7 @@ impl<K, V> Default for OverlayedMap<K, V> {
 }
 
 #[cfg(feature = "std")]
-impl From<sp_core::storage::StorageMap> for OverlayedMap<StorageKey, Option<StorageValue>> {
+impl From<sp_core::storage::StorageMap> for OverlayedMap<StorageKey, StorageEntry> {
 	fn from(storage: sp_core::storage::StorageMap) -> Self {
 		Self {
 			changes: storage
@@ -130,7 +221,7 @@ impl From<sp_core::storage::StorageMap> for OverlayedMap<StorageKey, Option<Stor
 						k,
 						OverlayedEntry {
 							transactions: SmallVec::from_iter([InnerValue {
-								value: Some(v),
+								value: StorageEntry::Set(v),
 								extrinsics: Default::default(),
 							}]),
 						},
@@ -189,7 +280,7 @@ impl<V> OverlayedEntry<V> {
 	///
 	/// This makes sure that the old version is not overwritten and can be properly
 	/// rolled back when required.
-	fn set(&mut self, value: V, first_write_in_tx: bool, at_extrinsic: Option<u32>) {
+	fn set_offchain(&mut self, value: V, first_write_in_tx: bool, at_extrinsic: Option<u32>) {
 		if first_write_in_tx || self.transactions.is_empty() {
 			self.transactions.push(InnerValue { value, extrinsics: Default::default() });
 		} else {
@@ -202,10 +293,223 @@ impl<V> OverlayedEntry<V> {
 	}
 }
 
-impl OverlayedEntry<Option<StorageValue>> {
+/// Restore the `current_data` from an [`StorageEntry::Append`] back to the parent.
+///
+/// When creating a new transaction layer from an appended entry, the `data` will be moved to
+/// prevent extra allocations. So, we need to move back the `data` to the parent layer when there is
+/// a roll back or the entry is set to some different value. This functions puts back the data to
+/// the `parent` and truncates any extra elements that got added in the current layer.
+///
+/// The current and the `parent` layer need to be [`StorageEntry::Append`] or otherwise the function
+/// is a no-op.
+fn restore_append_to_parent(
+	parent: &mut StorageEntry,
+	mut current_data: Vec<u8>,
+	current_materialized: Option<u32>,
+	mut target_parent_size: usize,
+) {
+	match parent {
+		StorageEntry::Append {
+			data: parent_data,
+			materialized_length: parent_materialized,
+			..
+		} => {
+			// Forward the materialized length to the parent with the data. Next time when
+			// materializing the value, the length will be corrected. This prevents doing a
+			// potential allocation here.
+
+			let prev = parent_materialized.map(|l| Compact::<u32>::compact_len(&l)).unwrap_or(0);
+			let new = current_materialized.map(|l| Compact::<u32>::compact_len(&l)).unwrap_or(0);
+			let delta = new.abs_diff(prev);
+			if prev >= new {
+				target_parent_size -= delta;
+			} else {
+				target_parent_size += delta;
+			}
+			*parent_materialized = current_materialized;
+
+			// Truncate the data to remove any extra elements
+			current_data.truncate(target_parent_size);
+			*parent_data = current_data;
+		},
+		_ => {
+			// No value or a simple value, no need to restore
+		},
+	}
+}
+
+impl OverlayedEntry<StorageEntry> {
+	/// Writes a new version of a value.
+	///
+	/// This makes sure that the old version is not overwritten and can be properly
+	/// rolled back when required.
+	fn set(
+		&mut self,
+		value: Option<StorageValue>,
+		first_write_in_tx: bool,
+		at_extrinsic: Option<u32>,
+	) {
+		let value = value.map_or_else(|| StorageEntry::Remove, StorageEntry::Set);
+
+		if first_write_in_tx || self.transactions.is_empty() {
+			self.transactions.push(InnerValue { value, extrinsics: Default::default() });
+		} else {
+			let mut old_value = self.value_mut();
+
+			let set_prev = if let StorageEntry::Append {
+				data,
+				current_length: _,
+				materialized_length,
+				parent_size,
+			} = &mut old_value
+			{
+				parent_size
+					.map(|parent_size| (core::mem::take(data), *materialized_length, parent_size))
+			} else {
+				None
+			};
+
+			*old_value = value;
+
+			if let Some((data, current_materialized, parent_size)) = set_prev {
+				let transactions = self.transactions.len();
+
+				debug_assert!(transactions >= 2);
+				let parent = self
+					.transactions
+					.get_mut(transactions - 2)
+					.expect("`set_prev` is only `Some(_)`, if the value came from parent; qed");
+				restore_append_to_parent(
+					&mut parent.value,
+					data,
+					current_materialized,
+					parent_size,
+				);
+			}
+		}
+
+		if let Some(extrinsic) = at_extrinsic {
+			self.transaction_extrinsics_mut().insert(extrinsic);
+		}
+	}
+
+	/// Append content to a value, updating a prefixed compact encoded length.
+	///
+	/// This makes sure that the old version is not overwritten and can be properly
+	/// rolled back when required.
+	/// This avoid copying value from previous transaction.
+	fn append(
+		&mut self,
+		element: StorageValue,
+		first_write_in_tx: bool,
+		init: impl Fn() -> StorageValue,
+		at_extrinsic: Option<u32>,
+	) {
+		if self.transactions.is_empty() {
+			let mut init_value = init();
+
+			let mut append = StorageAppend::new(&mut init_value);
+
+			// Either the init value is a SCALE list like value to that the `element` gets appended
+			// or the value is reset to `[element]`.
+			let (data, current_length, materialized_length) =
+				if let Some(len) = append.extract_length() {
+					append.append_raw(element);
+
+					(init_value, len + 1, Some(len))
+				} else {
+					(element, 1, None)
+				};
+
+			self.transactions.push(InnerValue {
+				value: StorageEntry::Append {
+					data,
+					current_length,
+					materialized_length,
+					parent_size: None,
+				},
+				extrinsics: Default::default(),
+			});
+		} else if first_write_in_tx {
+			let parent = self.value_mut();
+			let (data, current_length, materialized_length, parent_size) = match parent {
+				StorageEntry::Remove => (element, 1, None, None),
+				StorageEntry::Append { data, current_length, materialized_length, .. } => {
+					let parent_len = data.len();
+					let mut data_buf = core::mem::take(data);
+					StorageAppend::new(&mut data_buf).append_raw(element);
+					(data_buf, *current_length + 1, *materialized_length, Some(parent_len))
+				},
+				StorageEntry::Set(prev) => {
+					// For compatibility: append if there is a encoded length, overwrite
+					// with value otherwhise.
+					if let Some(current_length) = StorageAppend::new(prev).extract_length() {
+						// The `prev` is cloned here, but it could be optimized to not do the clone
+						// here as it is done for `Append` above.
+						let mut data = prev.clone();
+						StorageAppend::new(&mut data).append_raw(element);
+						(data, current_length + 1, Some(current_length), None)
+					} else {
+						// overwrite, same as empty case.
+						(element, 1, None, None)
+					}
+				},
+			};
+
+			self.transactions.push(InnerValue {
+				value: StorageEntry::Append {
+					data,
+					current_length,
+					materialized_length,
+					parent_size,
+				},
+				extrinsics: Default::default(),
+			});
+		} else {
+			// not first transaction write
+			let old_value = self.value_mut();
+			let replace = match old_value {
+				StorageEntry::Remove => Some((element, 1, None)),
+				StorageEntry::Set(data) => {
+					// Note that when the data here is not initialized with append,
+					// and still starts with a valid compact u32 we can have totally broken
+					// encoding.
+					let mut append = StorageAppend::new(data);
+
+					// For compatibility: append if there is a encoded length, overwrite
+					// with value otherwhise.
+					if let Some(current_length) = append.extract_length() {
+						append.append_raw(element);
+						Some((core::mem::take(data), current_length + 1, Some(current_length)))
+					} else {
+						Some((element, 1, None))
+					}
+				},
+				StorageEntry::Append { data, current_length, .. } => {
+					StorageAppend::new(data).append_raw(element);
+					*current_length += 1;
+					None
+				},
+			};
+
+			if let Some((data, current_length, materialized_length)) = replace {
+				*old_value = StorageEntry::Append {
+					data,
+					current_length,
+					materialized_length,
+					parent_size: None,
+				};
+			}
+		}
+
+		if let Some(extrinsic) = at_extrinsic {
+			self.transaction_extrinsics_mut().insert(extrinsic);
+		}
+	}
+
 	/// The value as seen by the current transaction.
-	pub fn value(&self) -> Option<&StorageValue> {
-		self.value_ref().as_ref()
+	pub fn value(&mut self) -> Option<&StorageValue> {
+		self.value_mut().as_option()
 	}
 }
 
@@ -238,20 +542,20 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 	}
 
 	/// Get an optional reference to the value stored for the specified key.
-	pub fn get<Q>(&self, key: &Q) -> Option<&OverlayedEntry<V>>
+	pub fn get<Q>(&mut self, key: &Q) -> Option<&mut OverlayedEntry<V>>
 	where
 		K: core::borrow::Borrow<Q>,
 		Q: Ord + ?Sized,
 	{
-		self.changes.get(key)
+		self.changes.get_mut(key)
 	}
 
 	/// Set a new value for the specified key.
 	///
 	/// Can be rolled back or committed when called inside a transaction.
-	pub fn set(&mut self, key: K, value: V, at_extrinsic: Option<u32>) {
+	pub fn set_offchain(&mut self, key: K, value: V, at_extrinsic: Option<u32>) {
 		let overlayed = self.changes.entry(key.clone()).or_default();
-		overlayed.set(value, insert_dirty(&mut self.dirty_keys, key), at_extrinsic);
+		overlayed.set_offchain(value, insert_dirty(&mut self.dirty_keys, key), at_extrinsic);
 	}
 
 	/// Get a list of all changes as seen by current transaction.
@@ -259,6 +563,11 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 		self.changes.iter()
 	}
 
+	/// Get a list of all changes as seen by current transaction.
+	pub fn changes_mut(&mut self) -> impl Iterator<Item = (&K, &mut OverlayedEntry<V>)> {
+		self.changes.iter_mut()
+	}
+
 	/// Get a list of all changes as seen by current transaction, consumes
 	/// the overlay.
 	pub fn into_changes(self) -> impl Iterator<Item = (K, OverlayedEntry<V>)> {
@@ -298,7 +607,7 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 	///
 	/// This rollbacks all dangling transaction left open by the runtime.
 	/// Calling this while already outside the runtime will return an error.
-	pub fn exit_runtime(&mut self) -> Result<(), NotInRuntime> {
+	pub fn exit_runtime_offchain(&mut self) -> Result<(), NotInRuntime> {
 		if let ExecutionMode::Client = self.execution_mode {
 			return Err(NotInRuntime)
 		}
@@ -310,7 +619,7 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 			);
 		}
 		while self.has_open_runtime_transactions() {
-			self.rollback_transaction()
+			self.rollback_transaction_offchain()
 				.expect("The loop condition checks that the transaction depth is > 0; qed");
 		}
 		Ok(())
@@ -331,24 +640,24 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 	///
 	/// Any changes made during that transaction are discarded. Returns an error if
 	/// there is no open transaction that can be rolled back.
-	pub fn rollback_transaction(&mut self) -> Result<(), NoOpenTransaction> {
-		self.close_transaction(true)
+	pub fn rollback_transaction_offchain(&mut self) -> Result<(), NoOpenTransaction> {
+		self.close_transaction_offchain(true)
 	}
 
 	/// Commit the last transaction started by `start_transaction`.
 	///
 	/// Any changes made during that transaction are committed. Returns an error if
 	/// there is no open transaction that can be committed.
-	pub fn commit_transaction(&mut self) -> Result<(), NoOpenTransaction> {
-		self.close_transaction(false)
+	pub fn commit_transaction_offchain(&mut self) -> Result<(), NoOpenTransaction> {
+		self.close_transaction_offchain(false)
 	}
 
-	fn close_transaction(&mut self, rollback: bool) -> Result<(), NoOpenTransaction> {
+	fn close_transaction_offchain(&mut self, rollback: bool) -> Result<(), NoOpenTransaction> {
 		// runtime is not allowed to close transactions started by the client
-		if let ExecutionMode::Runtime = self.execution_mode {
-			if !self.has_open_runtime_transactions() {
-				return Err(NoOpenTransaction)
-			}
+		if matches!(self.execution_mode, ExecutionMode::Runtime) &&
+			!self.has_open_runtime_transactions()
+		{
+			return Err(NoOpenTransaction)
 		}
 
 		for key in self.dirty_keys.pop().ok_or(NoOpenTransaction)? {
@@ -398,32 +707,176 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 }
 
 impl OverlayedChangeSet {
-	/// Get a mutable reference for a value.
+	/// Rollback the last transaction started by `start_transaction`.
+	///
+	/// Any changes made during that transaction are discarded. Returns an error if
+	/// there is no open transaction that can be rolled back.
+	pub fn rollback_transaction(&mut self) -> Result<(), NoOpenTransaction> {
+		self.close_transaction(true)
+	}
+
+	/// Commit the last transaction started by `start_transaction`.
+	///
+	/// Any changes made during that transaction are committed. Returns an error if
+	/// there is no open transaction that can be committed.
+	pub fn commit_transaction(&mut self) -> Result<(), NoOpenTransaction> {
+		self.close_transaction(false)
+	}
+
+	fn close_transaction(&mut self, rollback: bool) -> Result<(), NoOpenTransaction> {
+		// runtime is not allowed to close transactions started by the client
+		if matches!(self.execution_mode, ExecutionMode::Runtime) &&
+			!self.has_open_runtime_transactions()
+		{
+			return Err(NoOpenTransaction)
+		}
+
+		for key in self.dirty_keys.pop().ok_or(NoOpenTransaction)? {
+			let overlayed = self.changes.get_mut(&key).expect(
+				"\
+				A write to an OverlayedValue is recorded in the dirty key set. Before an
+				OverlayedValue is removed, its containing dirty set is removed. This
+				function is only called for keys that are in the dirty set. qed\
+			",
+			);
+
+			if rollback {
+				match overlayed.pop_transaction().value {
+					StorageEntry::Append {
+						data,
+						materialized_length,
+						parent_size: Some(parent_size),
+						..
+					} => {
+						debug_assert!(!overlayed.transactions.is_empty());
+						restore_append_to_parent(
+							overlayed.value_mut(),
+							data,
+							materialized_length,
+							parent_size,
+						);
+					},
+					_ => (),
+				}
+
+				// We need to remove the key as an `OverlayValue` with no transactions
+				// violates its invariant of always having at least one transaction.
+				if overlayed.transactions.is_empty() {
+					self.changes.remove(&key);
+				}
+			} else {
+				let has_predecessor = if let Some(dirty_keys) = self.dirty_keys.last_mut() {
+					// Not the last tx: Did the previous tx write to this key?
+					!dirty_keys.insert(key)
+				} else {
+					// Last tx: Is there already a value in the committed set?
+					// Check against one rather than empty because the current tx is still
+					// in the list as it is popped later in this function.
+					overlayed.transactions.len() > 1
+				};
+
+				// We only need to merge if there is an pre-existing value. It may be a value from
+				// the previous transaction or a value committed without any open transaction.
+				if has_predecessor {
+					let mut committed_tx = overlayed.pop_transaction();
+					let mut merge_appends = false;
+
+					// consecutive appends need to keep past `parent_size` value.
+					if let StorageEntry::Append { parent_size, .. } = &mut committed_tx.value {
+						if parent_size.is_some() {
+							let parent = overlayed.value_mut();
+							if let StorageEntry::Append { parent_size: keep_me, .. } = parent {
+								merge_appends = true;
+								*parent_size = *keep_me;
+							}
+						}
+					}
+
+					if merge_appends {
+						*overlayed.value_mut() = committed_tx.value;
+					} else {
+						let removed = core::mem::replace(overlayed.value_mut(), committed_tx.value);
+						// The transaction being commited is not an append operation. However, the
+						// value being overwritten in the previous transaction might be an append
+						// that needs to be merged with its parent. We only need to handle `Append`
+						// here because `Set` and `Remove` can directly overwrite previous
+						// operations.
+						if let StorageEntry::Append {
+							parent_size, data, materialized_length, ..
+						} = removed
+						{
+							if let Some(parent_size) = parent_size {
+								let transactions = overlayed.transactions.len();
+
+								// info from replaced head so len is at least one
+								// and parent_size implies a parent transaction
+								// so length is at least two.
+								debug_assert!(transactions >= 2);
+								if let Some(parent) =
+									overlayed.transactions.get_mut(transactions - 2)
+								{
+									restore_append_to_parent(
+										&mut parent.value,
+										data,
+										materialized_length,
+										parent_size,
+									)
+								}
+							}
+						}
+					}
+
+					overlayed.transaction_extrinsics_mut().extend(committed_tx.extrinsics);
+				}
+			}
+		}
+
+		Ok(())
+	}
+
+	/// Call this when control returns from the runtime.
+	///
+	/// This commits all dangling transaction left open by the runtime.
+	/// Calling this while already outside the runtime will return an error.
+	pub fn exit_runtime(&mut self) -> Result<(), NotInRuntime> {
+		if matches!(self.execution_mode, ExecutionMode::Client) {
+			return Err(NotInRuntime)
+		}
+
+		self.execution_mode = ExecutionMode::Client;
+		if self.has_open_runtime_transactions() {
+			warn!(
+				"{} storage transactions are left open by the runtime. Those will be rolled back.",
+				self.transaction_depth() - self.num_client_transactions,
+			);
+		}
+		while self.has_open_runtime_transactions() {
+			self.rollback_transaction()
+				.expect("The loop condition checks that the transaction depth is > 0; qed");
+		}
+
+		Ok(())
+	}
+
+	/// Set a new value for the specified key.
 	///
 	/// Can be rolled back or committed when called inside a transaction.
-	#[must_use = "A change was registered, so this value MUST be modified."]
-	pub fn modify(
+	pub fn set(&mut self, key: StorageKey, value: Option<StorageValue>, at_extrinsic: Option<u32>) {
+		let overlayed = self.changes.entry(key.clone()).or_default();
+		overlayed.set(value, insert_dirty(&mut self.dirty_keys, key), at_extrinsic);
+	}
+
+	/// Append bytes to an existing content.
+	pub fn append_storage(
 		&mut self,
 		key: StorageKey,
+		value: StorageValue,
 		init: impl Fn() -> StorageValue,
 		at_extrinsic: Option<u32>,
-	) -> &mut Option<StorageValue> {
+	) {
 		let overlayed = self.changes.entry(key.clone()).or_default();
 		let first_write_in_tx = insert_dirty(&mut self.dirty_keys, key);
-		let clone_into_new_tx = if let Some(tx) = overlayed.transactions.last() {
-			if first_write_in_tx {
-				Some(tx.value.clone())
-			} else {
-				None
-			}
-		} else {
-			Some(Some(init()))
-		};
-
-		if let Some(cloned) = clone_into_new_tx {
-			overlayed.set(cloned, first_write_in_tx, at_extrinsic);
-		}
-		overlayed.value_mut()
+		overlayed.append(value, first_write_in_tx, init, at_extrinsic);
 	}
 
 	/// Set all values to deleted which are matched by the predicate.
@@ -436,7 +889,7 @@ impl OverlayedChangeSet {
 	) -> u32 {
 		let mut count = 0;
 		for (key, val) in self.changes.iter_mut().filter(|(k, v)| predicate(k, v)) {
-			if val.value_ref().is_some() {
+			if matches!(val.value_ref(), StorageEntry::Set(..) | StorageEntry::Append { .. }) {
 				count += 1;
 			}
 			val.set(None, insert_dirty(&mut self.dirty_keys, key.clone()), at_extrinsic);
@@ -445,10 +898,13 @@ impl OverlayedChangeSet {
 	}
 
 	/// Get the iterator over all changes that follow the supplied `key`.
-	pub fn changes_after(&self, key: &[u8]) -> impl Iterator<Item = (&[u8], &OverlayedValue)> {
+	pub fn changes_after(
+		&mut self,
+		key: &[u8],
+	) -> impl Iterator<Item = (&[u8], &mut OverlayedValue)> {
 		use core::ops::Bound;
 		let range = (Bound::Excluded(key), Bound::Unbounded);
-		self.changes.range::<[u8], _>(range).map(|(k, v)| (k.as_slice(), v))
+		self.changes.range_mut::<[u8], _>(range).map(|(k, v)| (k.as_slice(), v))
 	}
 }
 
@@ -460,18 +916,19 @@ mod test {
 	type Changes<'a> = Vec<(&'a [u8], (Option<&'a [u8]>, Vec<u32>))>;
 	type Drained<'a> = Vec<(&'a [u8], Option<&'a [u8]>)>;
 
-	fn assert_changes(is: &OverlayedChangeSet, expected: &Changes) {
+	fn assert_changes(is: &mut OverlayedChangeSet, expected: &Changes) {
 		let is: Changes = is
-			.changes()
+			.changes_mut()
 			.map(|(k, v)| {
-				(k.as_ref(), (v.value().map(AsRef::as_ref), v.extrinsics().into_iter().collect()))
+				let extrinsics = v.extrinsics().into_iter().collect();
+				(k.as_ref(), (v.value().map(AsRef::as_ref), extrinsics))
 			})
 			.collect();
 		assert_eq!(&is, expected);
 	}
 
 	fn assert_drained_changes(is: OverlayedChangeSet, expected: Changes) {
-		let is = is.drain_committed().collect::<Vec<_>>();
+		let is = is.drain_committed().map(|(k, v)| (k, v.to_option())).collect::<Vec<_>>();
 		let expected = expected
 			.iter()
 			.map(|(k, v)| (k.to_vec(), v.0.map(From::from)))
@@ -480,7 +937,7 @@ mod test {
 	}
 
 	fn assert_drained(is: OverlayedChangeSet, expected: Drained) {
-		let is = is.drain_committed().collect::<Vec<_>>();
+		let is = is.drain_committed().map(|(k, v)| (k, v.to_option())).collect::<Vec<_>>();
 		let expected = expected
 			.iter()
 			.map(|(k, v)| (k.to_vec(), v.map(From::from)))
@@ -535,7 +992,7 @@ mod test {
 			(b"key7", (Some(b"val7-rolled"), vec![77])),
 			(b"key99", (Some(b"val99"), vec![99])),
 		];
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		// this should be no-op
 		changeset.start_transaction();
@@ -546,7 +1003,7 @@ mod test {
 		assert_eq!(changeset.transaction_depth(), 3);
 		changeset.commit_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 2);
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		// roll back our first transactions that actually contains something
 		changeset.rollback_transaction().unwrap();
@@ -558,11 +1015,11 @@ mod test {
 			(b"key42", (Some(b"val42"), vec![42])),
 			(b"key99", (Some(b"val99"), vec![99])),
 		];
-		assert_changes(&changeset, &rolled_back);
+		assert_changes(&mut changeset, &rolled_back);
 
 		changeset.commit_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 0);
-		assert_changes(&changeset, &rolled_back);
+		assert_changes(&mut changeset, &rolled_back);
 
 		assert_drained_changes(changeset, rolled_back);
 	}
@@ -598,7 +1055,7 @@ mod test {
 			(b"key7", (Some(b"val7-rolled"), vec![77])),
 			(b"key99", (Some(b"val99"), vec![99])),
 		];
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		// this should be no-op
 		changeset.start_transaction();
@@ -609,35 +1066,46 @@ mod test {
 		assert_eq!(changeset.transaction_depth(), 3);
 		changeset.commit_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 2);
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		changeset.commit_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 1);
 
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		changeset.rollback_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 0);
 
 		let rolled_back: Changes =
 			vec![(b"key0", (Some(b"val0-1"), vec![1, 10])), (b"key1", (Some(b"val1"), vec![1]))];
-		assert_changes(&changeset, &rolled_back);
+		assert_changes(&mut changeset, &rolled_back);
 
 		assert_drained_changes(changeset, rolled_back);
 	}
 
 	#[test]
-	fn modify_works() {
+	fn append_works() {
+		use codec::Encode;
 		let mut changeset = OverlayedChangeSet::default();
 		assert_eq!(changeset.transaction_depth(), 0);
-		let init = || b"valinit".to_vec();
+		let init = || vec![b"valinit".to_vec()].encode();
 
 		// committed set
-		changeset.set(b"key0".to_vec(), Some(b"val0".to_vec()), Some(0));
+		let val0 = vec![b"val0".to_vec()].encode();
+		changeset.set(b"key0".to_vec(), Some(val0.clone()), Some(0));
 		changeset.set(b"key1".to_vec(), None, Some(1));
-		let val = changeset.modify(b"key3".to_vec(), init, Some(3));
-		assert_eq!(val, &Some(b"valinit".to_vec()));
-		val.as_mut().unwrap().extend_from_slice(b"-modified");
+		let all_changes: Changes =
+			vec![(b"key0", (Some(val0.as_slice()), vec![0])), (b"key1", (None, vec![1]))];
+
+		assert_changes(&mut changeset, &all_changes);
+		changeset.append_storage(b"key3".to_vec(), b"-modified".to_vec().encode(), init, Some(3));
+		let val3 = vec![b"valinit".to_vec(), b"-modified".to_vec()].encode();
+		let all_changes: Changes = vec![
+			(b"key0", (Some(val0.as_slice()), vec![0])),
+			(b"key1", (None, vec![1])),
+			(b"key3", (Some(val3.as_slice()), vec![3])),
+		];
+		assert_changes(&mut changeset, &all_changes);
 
 		changeset.start_transaction();
 		assert_eq!(changeset.transaction_depth(), 1);
@@ -645,39 +1113,75 @@ mod test {
 		assert_eq!(changeset.transaction_depth(), 2);
 
 		// non existing value -> init value should be returned
-		let val = changeset.modify(b"key2".to_vec(), init, Some(2));
-		assert_eq!(val, &Some(b"valinit".to_vec()));
-		val.as_mut().unwrap().extend_from_slice(b"-modified");
+		changeset.append_storage(b"key3".to_vec(), b"-twice".to_vec().encode(), init, Some(15));
 
-		// existing value should be returned by modify
-		let val = changeset.modify(b"key0".to_vec(), init, Some(10));
-		assert_eq!(val, &Some(b"val0".to_vec()));
-		val.as_mut().unwrap().extend_from_slice(b"-modified");
+		// non existing value -> init value should be returned
+		changeset.append_storage(b"key2".to_vec(), b"-modified".to_vec().encode(), init, Some(2));
+		// existing value should be reuse on append
+		changeset.append_storage(b"key0".to_vec(), b"-modified".to_vec().encode(), init, Some(10));
 
 		// should work for deleted keys
-		let val = changeset.modify(b"key1".to_vec(), init, Some(20));
-		assert_eq!(val, &None);
-		*val = Some(b"deleted-modified".to_vec());
+		changeset.append_storage(
+			b"key1".to_vec(),
+			b"deleted-modified".to_vec().encode(),
+			init,
+			Some(20),
+		);
+		let val0_2 = vec![b"val0".to_vec(), b"-modified".to_vec()].encode();
+		let val3_2 = vec![b"valinit".to_vec(), b"-modified".to_vec(), b"-twice".to_vec()].encode();
+		let val1 = vec![b"deleted-modified".to_vec()].encode();
+		let all_changes: Changes = vec![
+			(b"key0", (Some(val0_2.as_slice()), vec![0, 10])),
+			(b"key1", (Some(val1.as_slice()), vec![1, 20])),
+			(b"key2", (Some(val3.as_slice()), vec![2])),
+			(b"key3", (Some(val3_2.as_slice()), vec![3, 15])),
+		];
+		assert_changes(&mut changeset, &all_changes);
+
+		changeset.start_transaction();
+		let val3_3 =
+			vec![b"valinit".to_vec(), b"-modified".to_vec(), b"-twice".to_vec(), b"-2".to_vec()]
+				.encode();
+		changeset.append_storage(b"key3".to_vec(), b"-2".to_vec().encode(), init, Some(21));
+		let all_changes2: Changes = vec![
+			(b"key0", (Some(val0_2.as_slice()), vec![0, 10])),
+			(b"key1", (Some(val1.as_slice()), vec![1, 20])),
+			(b"key2", (Some(val3.as_slice()), vec![2])),
+			(b"key3", (Some(val3_3.as_slice()), vec![3, 15, 21])),
+		];
+		assert_changes(&mut changeset, &all_changes2);
+		changeset.rollback_transaction().unwrap();
 
+		assert_changes(&mut changeset, &all_changes);
+		changeset.start_transaction();
+		let val3_4 = vec![
+			b"valinit".to_vec(),
+			b"-modified".to_vec(),
+			b"-twice".to_vec(),
+			b"-thrice".to_vec(),
+		]
+		.encode();
+		changeset.append_storage(b"key3".to_vec(), b"-thrice".to_vec().encode(), init, Some(25));
 		let all_changes: Changes = vec![
-			(b"key0", (Some(b"val0-modified"), vec![0, 10])),
-			(b"key1", (Some(b"deleted-modified"), vec![1, 20])),
-			(b"key2", (Some(b"valinit-modified"), vec![2])),
-			(b"key3", (Some(b"valinit-modified"), vec![3])),
+			(b"key0", (Some(val0_2.as_slice()), vec![0, 10])),
+			(b"key1", (Some(val1.as_slice()), vec![1, 20])),
+			(b"key2", (Some(val3.as_slice()), vec![2])),
+			(b"key3", (Some(val3_4.as_slice()), vec![3, 15, 25])),
 		];
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
+		changeset.commit_transaction().unwrap();
 		changeset.commit_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 1);
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		changeset.rollback_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 0);
 		let rolled_back: Changes = vec![
-			(b"key0", (Some(b"val0"), vec![0])),
+			(b"key0", (Some(val0.as_slice()), vec![0])),
 			(b"key1", (None, vec![1])),
-			(b"key3", (Some(b"valinit-modified"), vec![3])),
+			(b"key3", (Some(val3.as_slice()), vec![3])),
 		];
-		assert_changes(&changeset, &rolled_back);
+		assert_changes(&mut changeset, &rolled_back);
 		assert_drained_changes(changeset, rolled_back);
 	}
 
@@ -695,7 +1199,7 @@ mod test {
 		changeset.clear_where(|k, _| k.starts_with(b"del"), Some(5));
 
 		assert_changes(
-			&changeset,
+			&mut changeset,
 			&vec![
 				(b"del1", (None, vec![3, 5])),
 				(b"del2", (None, vec![4, 5])),
@@ -707,7 +1211,7 @@ mod test {
 		changeset.rollback_transaction().unwrap();
 
 		assert_changes(
-			&changeset,
+			&mut changeset,
 			&vec![
 				(b"del1", (Some(b"delval1"), vec![3])),
 				(b"del2", (Some(b"delval2"), vec![4])),
@@ -850,4 +1354,72 @@ mod test {
 		assert_eq!(changeset.exit_runtime(), Ok(()));
 		assert_eq!(changeset.exit_runtime(), Err(NotInRuntime));
 	}
+
+	#[test]
+	fn restore_append_to_parent() {
+		use codec::{Compact, Encode};
+		let mut changeset = OverlayedChangeSet::default();
+		let key: Vec<u8> = b"akey".into();
+
+		let from = 50; // 1 byte len
+		let to = 100; // 2 byte len
+		for i in 0..from {
+			changeset.append_storage(key.clone(), vec![i], Default::default, None);
+		}
+
+		// materialized
+		let encoded = changeset.get(&key).unwrap().value().unwrap();
+		let encoded_from_len = Compact(from as u32).encode();
+		assert_eq!(encoded_from_len.len(), 1);
+		assert!(encoded.starts_with(&encoded_from_len[..]));
+		let encoded_from = encoded.clone();
+
+		changeset.start_transaction();
+
+		for i in from..to {
+			changeset.append_storage(key.clone(), vec![i], Default::default, None);
+		}
+
+		// materialized
+		let encoded = changeset.get(&key).unwrap().value().unwrap();
+		let encoded_to_len = Compact(to as u32).encode();
+		assert_eq!(encoded_to_len.len(), 2);
+		assert!(encoded.starts_with(&encoded_to_len[..]));
+
+		changeset.rollback_transaction().unwrap();
+
+		let encoded = changeset.get(&key).unwrap().value().unwrap();
+		assert_eq!(&encoded_from, encoded);
+	}
+
+	/// First we have some `Set` operation with a valid SCALE list. Then we append data and rollback
+	/// afterwards.
+	#[test]
+	fn restore_initial_set_after_append_to_parent() {
+		use codec::{Compact, Encode};
+		let mut changeset = OverlayedChangeSet::default();
+		let key: Vec<u8> = b"akey".into();
+
+		let initial_data = vec![1u8; 50].encode();
+
+		changeset.set(key.clone(), Some(initial_data.clone()), None);
+
+		changeset.start_transaction();
+
+		// Append until we require 2 bytes for the length prefix.
+		for i in 0..50 {
+			changeset.append_storage(key.clone(), vec![i], Default::default, None);
+		}
+
+		// Materialize the value.
+		let encoded = changeset.get(&key).unwrap().value().unwrap();
+		let encoded_to_len = Compact(100u32).encode();
+		assert_eq!(encoded_to_len.len(), 2);
+		assert!(encoded.starts_with(&encoded_to_len[..]));
+
+		changeset.rollback_transaction().unwrap();
+
+		let encoded = changeset.get(&key).unwrap().value().unwrap();
+		assert_eq!(&initial_data, encoded);
+	}
 }
diff --git a/substrate/primitives/state-machine/src/overlayed_changes/mod.rs b/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
index d6fc404e84f..c2dc637bc71 100644
--- a/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
+++ b/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
@@ -289,7 +289,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 	/// Returns a double-Option: None if the key is unknown (i.e. and the query should be referred
 	/// to the backend); Some(None) if the key has been deleted. Some(Some(...)) for a key whose
 	/// value has been set.
-	pub fn storage(&self, key: &[u8]) -> Option<Option<&[u8]>> {
+	pub fn storage(&mut self, key: &[u8]) -> Option<Option<&[u8]>> {
 		self.top.get(key).map(|x| {
 			let value = x.value();
 			let size_read = value.map(|x| x.len() as u64).unwrap_or(0);
@@ -304,30 +304,11 @@ impl<H: Hasher> OverlayedChanges<H> {
 		self.storage_transaction_cache = None;
 	}
 
-	/// Returns mutable reference to current value.
-	/// If there is no value in the overlay, the given callback is used to initiate the value.
-	/// Warning this function registers a change, so the mutable reference MUST be modified.
-	///
-	/// Can be rolled back or committed when called inside a transaction.
-	#[must_use = "A change was registered, so this value MUST be modified."]
-	pub fn value_mut_or_insert_with(
-		&mut self,
-		key: &[u8],
-		init: impl Fn() -> StorageValue,
-	) -> &mut StorageValue {
-		self.mark_dirty();
-
-		let value = self.top.modify(key.to_vec(), init, self.extrinsic_index());
-
-		// if the value was deleted initialise it back with an empty vec
-		value.get_or_insert_with(StorageValue::default)
-	}
-
 	/// Returns a double-Option: None if the key is unknown (i.e. and the query should be referred
 	/// to the backend); Some(None) if the key has been deleted. Some(Some(...)) for a key whose
 	/// value has been set.
-	pub fn child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Option<&[u8]>> {
-		let map = self.children.get(child_info.storage_key())?;
+	pub fn child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Option<&[u8]>> {
+		let map = self.children.get_mut(child_info.storage_key())?;
 		let value = map.0.get(key)?.value();
 		let size_read = value.map(|x| x.len() as u64).unwrap_or(0);
 		self.stats.tally_read_modified(size_read);
@@ -342,7 +323,21 @@ impl<H: Hasher> OverlayedChanges<H> {
 
 		let size_write = val.as_ref().map(|x| x.len() as u64).unwrap_or(0);
 		self.stats.tally_write_overlay(size_write);
-		self.top.set(key, val, self.extrinsic_index());
+		let extrinsic_index = self.extrinsic_index();
+		self.top.set(key, val, extrinsic_index);
+	}
+
+	/// Append a element to storage, init with existing value if first write.
+	pub fn append_storage(
+		&mut self,
+		key: StorageKey,
+		element: StorageValue,
+		init: impl Fn() -> StorageValue,
+	) {
+		let extrinsic_index = self.extrinsic_index();
+		let size_write = element.len() as u64;
+		self.stats.tally_write_overlay(size_write);
+		self.top.append_storage(key, element, init, extrinsic_index);
 	}
 
 	/// Set a new value for the specified key and child.
@@ -396,7 +391,8 @@ impl<H: Hasher> OverlayedChanges<H> {
 	pub fn clear_prefix(&mut self, prefix: &[u8]) -> u32 {
 		self.mark_dirty();
 
-		self.top.clear_where(|key, _| key.starts_with(prefix), self.extrinsic_index())
+		let extrinsic_index = self.extrinsic_index();
+		self.top.clear_where(|key, _| key.starts_with(prefix), extrinsic_index)
 	}
 
 	/// Removes all key-value pairs which keys share the given prefix.
@@ -457,7 +453,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 		});
 		self.offchain
 			.overlay_mut()
-			.rollback_transaction()
+			.rollback_transaction_offchain()
 			.expect("Top and offchain changesets are started in lockstep; qed");
 		Ok(())
 	}
@@ -475,7 +471,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 		}
 		self.offchain
 			.overlay_mut()
-			.commit_transaction()
+			.commit_transaction_offchain()
 			.expect("Top and offchain changesets are started in lockstep; qed");
 		Ok(())
 	}
@@ -511,7 +507,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 		}
 		self.offchain
 			.overlay_mut()
-			.exit_runtime()
+			.exit_runtime_offchain()
 			.expect("Top and offchain changesets are started in lockstep; qed");
 		Ok(())
 	}
@@ -535,11 +531,24 @@ impl<H: Hasher> OverlayedChanges<H> {
 		self.children.values().map(|v| (v.0.changes(), &v.1))
 	}
 
+	/// Get an iterator over all child changes as seen by the current transaction.
+	pub fn children_mut(
+		&mut self,
+	) -> impl Iterator<Item = (impl Iterator<Item = (&StorageKey, &mut OverlayedValue)>, &ChildInfo)>
+	{
+		self.children.values_mut().map(|v| (v.0.changes_mut(), &v.1))
+	}
+
 	/// Get an iterator over all top changes as been by the current transaction.
 	pub fn changes(&self) -> impl Iterator<Item = (&StorageKey, &OverlayedValue)> {
 		self.top.changes()
 	}
 
+	/// Get an iterator over all top changes as been by the current transaction.
+	pub fn changes_mut(&mut self) -> impl Iterator<Item = (&StorageKey, &mut OverlayedValue)> {
+		self.top.changes_mut()
+	}
+
 	/// Get an optional iterator over all child changes stored under the supplied key.
 	pub fn child_changes(
 		&self,
@@ -548,6 +557,16 @@ impl<H: Hasher> OverlayedChanges<H> {
 		self.children.get(key).map(|(overlay, info)| (overlay.changes(), info))
 	}
 
+	/// Get an optional iterator over all child changes stored under the supplied key.
+	pub fn child_changes_mut(
+		&mut self,
+		key: &[u8],
+	) -> Option<(impl Iterator<Item = (&StorageKey, &mut OverlayedValue)>, &ChildInfo)> {
+		self.children
+			.get_mut(key)
+			.map(|(overlay, info)| (overlay.changes_mut(), &*info))
+	}
+
 	/// Get an list of all index operations.
 	pub fn transaction_index_ops(&self) -> &[IndexOperation] {
 		&self.transaction_index_ops
@@ -575,11 +594,12 @@ impl<H: Hasher> OverlayedChanges<H> {
 		};
 
 		use core::mem::take;
-		let main_storage_changes = take(&mut self.top).drain_committed();
-		let child_storage_changes = take(&mut self.children)
-			.into_iter()
-			.map(|(key, (val, info))| (key, (val.drain_committed(), info)));
-
+		let main_storage_changes =
+			take(&mut self.top).drain_committed().map(|(k, v)| (k, v.to_option()));
+		let child_storage_changes =
+			take(&mut self.children).into_iter().map(|(key, (val, info))| {
+				(key, (val.drain_committed().map(|(k, v)| (k, v.to_option())), info))
+			});
 		let offchain_storage_changes = self.offchain_drain_committed().collect();
 
 		#[cfg(feature = "std")]
@@ -610,7 +630,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 	/// set this index before first and unset after last extrinsic is executed.
 	/// Changes that are made outside of extrinsics, are marked with
 	/// `NO_EXTRINSIC_INDEX` index.
-	fn extrinsic_index(&self) -> Option<u32> {
+	fn extrinsic_index(&mut self) -> Option<u32> {
 		self.collect_extrinsics.then(|| {
 			self.storage(EXTRINSIC_INDEX)
 				.and_then(|idx| idx.and_then(|idx| Decode::decode(&mut &*idx).ok()))
@@ -634,10 +654,12 @@ impl<H: Hasher> OverlayedChanges<H> {
 			return (cache.transaction_storage_root, true)
 		}
 
-		let delta = self.changes().map(|(k, v)| (&k[..], v.value().map(|v| &v[..])));
-		let child_delta = self.children().map(|(changes, info)| {
-			(info, changes.map(|(k, v)| (&k[..], v.value().map(|v| &v[..]))))
-		});
+		let delta = self.top.changes_mut().map(|(k, v)| (&k[..], v.value().map(|v| &v[..])));
+
+		let child_delta = self
+			.children
+			.values_mut()
+			.map(|v| (&v.1, v.0.changes_mut().map(|(k, v)| (&k[..], v.value().map(|v| &v[..])))));
 
 		let (root, transaction) = backend.full_storage_root(delta, child_delta, state_version);
 
@@ -677,7 +699,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 			return Ok((root, true))
 		}
 
-		let root = if let Some((changes, info)) = self.child_changes(storage_key) {
+		let root = if let Some((changes, info)) = self.child_changes_mut(storage_key) {
 			let delta = changes.map(|(k, v)| (k.as_ref(), v.value().map(AsRef::as_ref)));
 			Some(backend.child_storage_root(info, delta, state_version))
 		} else {
@@ -711,19 +733,19 @@ impl<H: Hasher> OverlayedChanges<H> {
 
 	/// Returns an iterator over the keys (in lexicographic order) following `key` (excluding `key`)
 	/// alongside its value.
-	pub fn iter_after(&self, key: &[u8]) -> impl Iterator<Item = (&[u8], &OverlayedValue)> {
+	pub fn iter_after(&mut self, key: &[u8]) -> impl Iterator<Item = (&[u8], &mut OverlayedValue)> {
 		self.top.changes_after(key)
 	}
 
 	/// Returns an iterator over the keys (in lexicographic order) following `key` (excluding `key`)
 	/// alongside its value for the given `storage_key` child.
 	pub fn child_iter_after(
-		&self,
+		&mut self,
 		storage_key: &[u8],
 		key: &[u8],
-	) -> impl Iterator<Item = (&[u8], &OverlayedValue)> {
+	) -> impl Iterator<Item = (&[u8], &mut OverlayedValue)> {
 		self.children
-			.get(storage_key)
+			.get_mut(storage_key)
 			.map(|(overlay, _)| overlay.changes_after(key))
 			.into_iter()
 			.flatten()
@@ -858,7 +880,11 @@ mod tests {
 	use sp_core::{traits::Externalities, Blake2Hasher};
 	use std::collections::BTreeMap;
 
-	fn assert_extrinsics(overlay: &OverlayedChangeSet, key: impl AsRef<[u8]>, expected: Vec<u32>) {
+	fn assert_extrinsics(
+		overlay: &mut OverlayedChangeSet,
+		key: impl AsRef<[u8]>,
+		expected: Vec<u32>,
+	) {
 		assert_eq!(
 			overlay.get(key.as_ref()).unwrap().extrinsics().into_iter().collect::<Vec<_>>(),
 			expected
@@ -1049,9 +1075,9 @@ mod tests {
 		overlay.set_extrinsic_index(2);
 		overlay.set_storage(vec![1], Some(vec![6]));
 
-		assert_extrinsics(&overlay.top, vec![1], vec![0, 2]);
-		assert_extrinsics(&overlay.top, vec![3], vec![1]);
-		assert_extrinsics(&overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
+		assert_extrinsics(&mut overlay.top, vec![1], vec![0, 2]);
+		assert_extrinsics(&mut overlay.top, vec![3], vec![1]);
+		assert_extrinsics(&mut overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
 
 		overlay.start_transaction();
 
@@ -1061,15 +1087,15 @@ mod tests {
 		overlay.set_extrinsic_index(4);
 		overlay.set_storage(vec![1], Some(vec![8]));
 
-		assert_extrinsics(&overlay.top, vec![1], vec![0, 2, 4]);
-		assert_extrinsics(&overlay.top, vec![3], vec![1, 3]);
-		assert_extrinsics(&overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
+		assert_extrinsics(&mut overlay.top, vec![1], vec![0, 2, 4]);
+		assert_extrinsics(&mut overlay.top, vec![3], vec![1, 3]);
+		assert_extrinsics(&mut overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
 
 		overlay.rollback_transaction().unwrap();
 
-		assert_extrinsics(&overlay.top, vec![1], vec![0, 2]);
-		assert_extrinsics(&overlay.top, vec![3], vec![1]);
-		assert_extrinsics(&overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
+		assert_extrinsics(&mut overlay.top, vec![1], vec![0, 2]);
+		assert_extrinsics(&mut overlay.top, vec![3], vec![1]);
+		assert_extrinsics(&mut overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
 	}
 
 	#[test]
diff --git a/substrate/primitives/state-machine/src/overlayed_changes/offchain.rs b/substrate/primitives/state-machine/src/overlayed_changes/offchain.rs
index 1e6965e8747..517a51b0269 100644
--- a/substrate/primitives/state-machine/src/overlayed_changes/offchain.rs
+++ b/substrate/primitives/state-machine/src/overlayed_changes/offchain.rs
@@ -42,7 +42,7 @@ impl OffchainOverlayedChanges {
 	}
 
 	/// Iterate over all key value pairs by reference.
-	pub fn iter(&self) -> impl Iterator<Item = OffchainOverlayedChangesItem> {
+	pub fn iter(&mut self) -> impl Iterator<Item = OffchainOverlayedChangesItem> {
 		self.0.changes().map(|kv| (kv.0, kv.1.value_ref()))
 	}
 
@@ -53,14 +53,16 @@ impl OffchainOverlayedChanges {
 
 	/// Remove a key and its associated value from the offchain database.
 	pub fn remove(&mut self, prefix: &[u8], key: &[u8]) {
-		let _ = self
-			.0
-			.set((prefix.to_vec(), key.to_vec()), OffchainOverlayedChange::Remove, None);
+		let _ = self.0.set_offchain(
+			(prefix.to_vec(), key.to_vec()),
+			OffchainOverlayedChange::Remove,
+			None,
+		);
 	}
 
 	/// Set the value associated with a key under a prefix to the value provided.
 	pub fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) {
-		let _ = self.0.set(
+		let _ = self.0.set_offchain(
 			(prefix.to_vec(), key.to_vec()),
 			OffchainOverlayedChange::SetValue(value.to_vec()),
 			None,
@@ -68,7 +70,7 @@ impl OffchainOverlayedChanges {
 	}
 
 	/// Obtain a associated value to the given key in storage with prefix.
-	pub fn get(&self, prefix: &[u8], key: &[u8]) -> Option<OffchainOverlayedChange> {
+	pub fn get(&mut self, prefix: &[u8], key: &[u8]) -> Option<OffchainOverlayedChange> {
 		let key = (prefix.to_vec(), key.to_vec());
 		self.0.get(&key).map(|entry| entry.value_ref()).cloned()
 	}
diff --git a/substrate/primitives/state-machine/src/read_only.rs b/substrate/primitives/state-machine/src/read_only.rs
index 2056bf98663..b78d17138b0 100644
--- a/substrate/primitives/state-machine/src/read_only.rs
+++ b/substrate/primitives/state-machine/src/read_only.rs
@@ -88,39 +88,39 @@ where
 		panic!("Should not be used in read-only externalities!")
 	}
 
-	fn storage(&self, key: &[u8]) -> Option<StorageValue> {
+	fn storage(&mut self, key: &[u8]) -> Option<StorageValue> {
 		self.backend
 			.storage(key)
 			.expect("Backed failed for storage in ReadOnlyExternalities")
 	}
 
-	fn storage_hash(&self, key: &[u8]) -> Option<Vec<u8>> {
+	fn storage_hash(&mut self, key: &[u8]) -> Option<Vec<u8>> {
 		self.backend
 			.storage_hash(key)
 			.expect("Backed failed for storage_hash in ReadOnlyExternalities")
 			.map(|h| h.encode())
 	}
 
-	fn child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
+	fn child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
 		self.backend
 			.child_storage(child_info, key)
 			.expect("Backed failed for child_storage in ReadOnlyExternalities")
 	}
 
-	fn child_storage_hash(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
+	fn child_storage_hash(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
 		self.backend
 			.child_storage_hash(child_info, key)
 			.expect("Backed failed for child_storage_hash in ReadOnlyExternalities")
 			.map(|h| h.encode())
 	}
 
-	fn next_storage_key(&self, key: &[u8]) -> Option<StorageKey> {
+	fn next_storage_key(&mut self, key: &[u8]) -> Option<StorageKey> {
 		self.backend
 			.next_storage_key(key)
 			.expect("Backed failed for next_storage_key in ReadOnlyExternalities")
 	}
 
-	fn next_child_storage_key(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
+	fn next_child_storage_key(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
 		self.backend
 			.next_child_storage_key(child_info, key)
 			.expect("Backed failed for next_child_storage_key in ReadOnlyExternalities")
diff --git a/substrate/primitives/state-machine/src/testing.rs b/substrate/primitives/state-machine/src/testing.rs
index e19ba95755c..e9d64a891e8 100644
--- a/substrate/primitives/state-machine/src/testing.rs
+++ b/substrate/primitives/state-machine/src/testing.rs
@@ -209,12 +209,15 @@ where
 	///
 	/// In contrast to [`commit_all`](Self::commit_all) this will not panic if there are open
 	/// transactions.
-	pub fn as_backend(&self) -> InMemoryBackend<H> {
-		let top: Vec<_> =
-			self.overlay.changes().map(|(k, v)| (k.clone(), v.value().cloned())).collect();
+	pub fn as_backend(&mut self) -> InMemoryBackend<H> {
+		let top: Vec<_> = self
+			.overlay
+			.changes_mut()
+			.map(|(k, v)| (k.clone(), v.value().cloned()))
+			.collect();
 		let mut transaction = vec![(None, top)];
 
-		for (child_changes, child_info) in self.overlay.children() {
+		for (child_changes, child_info) in self.overlay.children_mut() {
 			transaction.push((
 				Some(child_info.clone()),
 				child_changes.map(|(k, v)| (k.clone(), v.value().cloned())).collect(),
@@ -293,13 +296,14 @@ where
 	}
 }
 
-impl<H: Hasher> PartialEq for TestExternalities<H>
+impl<H> TestExternalities<H>
 where
+	H: Hasher,
 	H::Out: Ord + 'static + codec::Codec,
 {
 	/// This doesn't test if they are in the same state, only if they contains the
 	/// same data at this state
-	fn eq(&self, other: &TestExternalities<H>) -> bool {
+	pub fn eq(&mut self, other: &mut TestExternalities<H>) -> bool {
 		self.as_backend().eq(&other.as_backend())
 	}
 }
diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs
index 0ecb98f3134..44e5f467d89 100644
--- a/substrate/utils/frame/remote-externalities/src/lib.rs
+++ b/substrate/utils/frame/remote-externalities/src/lib.rs
@@ -1383,7 +1383,7 @@ mod remote_tests {
 		init_logger();
 
 		// create an ext with children keys
-		let child_ext = Builder::<Block>::new()
+		let mut child_ext = Builder::<Block>::new()
 			.mode(Mode::Online(OnlineConfig {
 				transport: endpoint().clone().into(),
 				pallets: vec!["Proxy".to_owned()],
@@ -1396,7 +1396,7 @@ mod remote_tests {
 			.unwrap();
 
 		// create an ext without children keys
-		let ext = Builder::<Block>::new()
+		let mut ext = Builder::<Block>::new()
 			.mode(Mode::Online(OnlineConfig {
 				transport: endpoint().clone().into(),
 				pallets: vec!["Proxy".to_owned()],
-- 
GitLab