diff --git a/Cargo.lock b/Cargo.lock
index d2b7a47f84c93c461e78770d1bda0f2a80a12099..fba768c653c6676f3b350bef14c680e7f6adec01 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -294,6 +294,9 @@ name = "arbitrary"
 version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7d5a26814d8dcb93b0e5a0ff3c6d80a8843bafb21b39e8e18a6f05471870e110"
+dependencies = [
+ "derive_arbitrary",
+]
 
 [[package]]
 name = "ark-bls12-377"
@@ -4729,6 +4732,17 @@ dependencies = [
  "syn 2.0.61",
 ]
 
+[[package]]
+name = "derive_arbitrary"
+version = "1.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611"
+dependencies = [
+ "proc-macro2 1.0.82",
+ "quote 1.0.35",
+ "syn 2.0.61",
+]
+
 [[package]]
 name = "derive_more"
 version = "0.99.17"
@@ -20296,6 +20310,7 @@ dependencies = [
 name = "sp-state-machine"
 version = "0.35.0"
 dependencies = [
+ "arbitrary",
  "array-bytes",
  "assert_matches",
  "hash-db",
diff --git a/polkadot/node/core/pvf/common/src/executor_interface.rs b/polkadot/node/core/pvf/common/src/executor_interface.rs
index 87491e70c5f2a0a06b22c8e2c902561d0745dbfd..47f9ed1604e781c045ea643d251da8e324c98287 100644
--- a/polkadot/node/core/pvf/common/src/executor_interface.rs
+++ b/polkadot/node/core/pvf/common/src/executor_interface.rs
@@ -215,19 +215,19 @@ type HostFunctions = (
 struct ValidationExternalities(sp_externalities::Extensions);
 
 impl sp_externalities::Externalities for ValidationExternalities {
-	fn storage(&self, _: &[u8]) -> Option<Vec<u8>> {
+	fn storage(&mut self, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("storage: unsupported feature for parachain validation")
 	}
 
-	fn storage_hash(&self, _: &[u8]) -> Option<Vec<u8>> {
+	fn storage_hash(&mut self, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("storage_hash: unsupported feature for parachain validation")
 	}
 
-	fn child_storage_hash(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
+	fn child_storage_hash(&mut self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("child_storage_hash: unsupported feature for parachain validation")
 	}
 
-	fn child_storage(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
+	fn child_storage(&mut self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("child_storage: unsupported feature for parachain validation")
 	}
 
@@ -275,11 +275,11 @@ impl sp_externalities::Externalities for ValidationExternalities {
 		panic!("child_storage_root: unsupported feature for parachain validation")
 	}
 
-	fn next_child_storage_key(&self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
+	fn next_child_storage_key(&mut self, _: &ChildInfo, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("next_child_storage_key: unsupported feature for parachain validation")
 	}
 
-	fn next_storage_key(&self, _: &[u8]) -> Option<Vec<u8>> {
+	fn next_storage_key(&mut self, _: &[u8]) -> Option<Vec<u8>> {
 		panic!("next_storage_key: unsupported feature for parachain validation")
 	}
 
diff --git a/prdoc/pr_1223.prdoc b/prdoc/pr_1223.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..08b18557b70c6b649c49bb1dd62acc9528a79a86
--- /dev/null
+++ b/prdoc/pr_1223.prdoc
@@ -0,0 +1,13 @@
+title: Optimize storage append operation
+
+doc:
+  - audience: [Node Dev, Runtime Dev]
+    description: |
+      This pull request optimizes the storage append operation in the `OverlayedChanges`.
+      Before the internal buffer was cloned every time a new transaction was created. Cloning
+      the internal buffer is now only done when there is no other possibility. This should
+      improve the performance in situations like when depositing events from batched calls.
+
+crates:
+  - name: sp-state-machine
+    bump: major
diff --git a/substrate/client/executor/src/integration_tests/mod.rs b/substrate/client/executor/src/integration_tests/mod.rs
index 7f91b3ffe7644e37ebee0bc035faafbf38e21148..5d94ec6dcd38628be7858b8477e7e4892a45b210 100644
--- a/substrate/client/executor/src/integration_tests/mod.rs
+++ b/substrate/client/executor/src/integration_tests/mod.rs
@@ -178,7 +178,7 @@ fn storage_should_work(wasm_method: WasmExecutionMethod) {
 		assert_eq!(output, b"all ok!".to_vec().encode());
 	}
 
-	let expected = TestExternalities::new(sp_core::storage::Storage {
+	let mut expected = TestExternalities::new(sp_core::storage::Storage {
 		top: map![
 			b"input".to_vec() => value,
 			b"foo".to_vec() => b"bar".to_vec(),
@@ -186,7 +186,7 @@ fn storage_should_work(wasm_method: WasmExecutionMethod) {
 		],
 		children_default: map![],
 	});
-	assert_eq!(ext, expected);
+	assert!(ext.eq(&mut expected));
 }
 
 test_wasm_execution!(clear_prefix_should_work);
@@ -208,7 +208,7 @@ fn clear_prefix_should_work(wasm_method: WasmExecutionMethod) {
 		assert_eq!(output, b"all ok!".to_vec().encode());
 	}
 
-	let expected = TestExternalities::new(sp_core::storage::Storage {
+	let mut expected = TestExternalities::new(sp_core::storage::Storage {
 		top: map![
 			b"aaa".to_vec() => b"1".to_vec(),
 			b"aab".to_vec() => b"2".to_vec(),
@@ -216,7 +216,7 @@ fn clear_prefix_should_work(wasm_method: WasmExecutionMethod) {
 		],
 		children_default: map![],
 	});
-	assert_eq!(expected, ext);
+	assert!(expected.eq(&mut ext));
 }
 
 test_wasm_execution!(blake2_256_should_work);
diff --git a/substrate/primitives/externalities/src/lib.rs b/substrate/primitives/externalities/src/lib.rs
index 142200f614a69d8a5e195db8d434fba4bb5a83b7..bcc46ee4f1b2993fd34a5d632d2ce4a2757872e4 100644
--- a/substrate/primitives/externalities/src/lib.rs
+++ b/substrate/primitives/externalities/src/lib.rs
@@ -83,24 +83,24 @@ pub trait Externalities: ExtensionStore {
 	fn set_offchain_storage(&mut self, key: &[u8], value: Option<&[u8]>);
 
 	/// Read runtime storage.
-	fn storage(&self, key: &[u8]) -> Option<Vec<u8>>;
+	fn storage(&mut self, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Get storage value hash.
 	///
 	/// This may be optimized for large values.
-	fn storage_hash(&self, key: &[u8]) -> Option<Vec<u8>>;
+	fn storage_hash(&mut self, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Get child storage value hash.
 	///
 	/// This may be optimized for large values.
 	///
 	/// Returns an `Option` that holds the SCALE encoded hash.
-	fn child_storage_hash(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
+	fn child_storage_hash(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Read child runtime storage.
 	///
 	/// Returns an `Option` that holds the SCALE encoded hash.
-	fn child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
+	fn child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Set storage entry `key` of current contract being called (effective immediately).
 	fn set_storage(&mut self, key: Vec<u8>, value: Vec<u8>) {
@@ -124,20 +124,20 @@ pub trait Externalities: ExtensionStore {
 	}
 
 	/// Whether a storage entry exists.
-	fn exists_storage(&self, key: &[u8]) -> bool {
+	fn exists_storage(&mut self, key: &[u8]) -> bool {
 		self.storage(key).is_some()
 	}
 
 	/// Whether a child storage entry exists.
-	fn exists_child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> bool {
+	fn exists_child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> bool {
 		self.child_storage(child_info, key).is_some()
 	}
 
 	/// Returns the key immediately following the given key, if it exists.
-	fn next_storage_key(&self, key: &[u8]) -> Option<Vec<u8>>;
+	fn next_storage_key(&mut self, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Returns the key immediately following the given key, if it exists, in child storage.
-	fn next_child_storage_key(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
+	fn next_child_storage_key(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>>;
 
 	/// Clear an entire child storage.
 	///
diff --git a/substrate/primitives/io/src/lib.rs b/substrate/primitives/io/src/lib.rs
index c8675a9a90bd2ee16b9deca27457fafa88f8fec0..8ef1f41ce0197763d1edcd4afcb5f60f48a5de2e 100644
--- a/substrate/primitives/io/src/lib.rs
+++ b/substrate/primitives/io/src/lib.rs
@@ -181,7 +181,7 @@ impl From<MultiRemovalResults> for KillStorageResult {
 #[runtime_interface]
 pub trait Storage {
 	/// Returns the data for `key` in the storage or `None` if the key can not be found.
-	fn get(&self, key: &[u8]) -> Option<bytes::Bytes> {
+	fn get(&mut self, key: &[u8]) -> Option<bytes::Bytes> {
 		self.storage(key).map(bytes::Bytes::from)
 	}
 
@@ -190,7 +190,7 @@ pub trait Storage {
 	/// doesn't exist at all.
 	/// If `value_out` length is smaller than the returned length, only `value_out` length bytes
 	/// are copied into `value_out`.
-	fn read(&self, key: &[u8], value_out: &mut [u8], value_offset: u32) -> Option<u32> {
+	fn read(&mut self, key: &[u8], value_out: &mut [u8], value_offset: u32) -> Option<u32> {
 		self.storage(key).map(|value| {
 			let value_offset = value_offset as usize;
 			let data = &value[value_offset.min(value.len())..];
@@ -211,7 +211,7 @@ pub trait Storage {
 	}
 
 	/// Check whether the given `key` exists in storage.
-	fn exists(&self, key: &[u8]) -> bool {
+	fn exists(&mut self, key: &[u8]) -> bool {
 		self.exists_storage(key)
 	}
 
@@ -387,7 +387,7 @@ pub trait DefaultChildStorage {
 	///
 	/// Parameter `storage_key` is the unprefixed location of the root of the child trie in the
 	/// parent trie. Result is `None` if the value for `key` in the child storage can not be found.
-	fn get(&self, storage_key: &[u8], key: &[u8]) -> Option<Vec<u8>> {
+	fn get(&mut self, storage_key: &[u8], key: &[u8]) -> Option<Vec<u8>> {
 		let child_info = ChildInfo::new_default(storage_key);
 		self.child_storage(&child_info, key).map(|s| s.to_vec())
 	}
@@ -400,7 +400,7 @@ pub trait DefaultChildStorage {
 	/// If `value_out` length is smaller than the returned length, only `value_out` length bytes
 	/// are copied into `value_out`.
 	fn read(
-		&self,
+		&mut self,
 		storage_key: &[u8],
 		key: &[u8],
 		value_out: &mut [u8],
@@ -478,7 +478,7 @@ pub trait DefaultChildStorage {
 	/// Check a child storage key.
 	///
 	/// Check whether the given `key` exists in default child defined at `storage_key`.
-	fn exists(&self, storage_key: &[u8], key: &[u8]) -> bool {
+	fn exists(&mut self, storage_key: &[u8], key: &[u8]) -> bool {
 		let child_info = ChildInfo::new_default(storage_key);
 		self.exists_child_storage(&child_info, key)
 	}
diff --git a/substrate/primitives/state-machine/Cargo.toml b/substrate/primitives/state-machine/Cargo.toml
index c383a17cb006e3c4433172aedde957465fbb569f..f6402eccf0df0c7f9161bd8daf9456b267702876 100644
--- a/substrate/primitives/state-machine/Cargo.toml
+++ b/substrate/primitives/state-machine/Cargo.toml
@@ -30,6 +30,7 @@ sp-externalities = { path = "../externalities", default-features = false }
 sp-panic-handler = { path = "../panic-handler", optional = true }
 sp-trie = { path = "../trie", default-features = false }
 trie-db = { version = "0.29.0", default-features = false }
+arbitrary = { version = "1", features = ["derive"], optional = true }
 
 [dev-dependencies]
 array-bytes = "6.2.2"
@@ -37,9 +38,11 @@ pretty_assertions = "1.2.1"
 rand = "0.8.5"
 sp-runtime = { path = "../runtime" }
 assert_matches = "1.5"
+arbitrary = { version = "1", features = ["derive"] }
 
 [features]
 default = ["std"]
+fuzzing = ["arbitrary"]
 std = [
 	"codec/std",
 	"hash-db/std",
diff --git a/substrate/primitives/state-machine/fuzz/Cargo.toml b/substrate/primitives/state-machine/fuzz/Cargo.toml
new file mode 100644
index 0000000000000000000000000000000000000000..416c00c34fda4e8e1853f2b8d414e40f7d1ff808
--- /dev/null
+++ b/substrate/primitives/state-machine/fuzz/Cargo.toml
@@ -0,0 +1,30 @@
+[package]
+name = "sp-state-machine-fuzz"
+version = "0.0.0"
+publish = false
+license = "Apache-2.0"
+edition = "2021"
+
+[package.metadata]
+cargo-fuzz = true
+
+[dependencies]
+libfuzzer-sys = "0.4"
+sp-runtime = { path = "../../runtime" }
+
+[dependencies.sp-state-machine]
+path = ".."
+features = ["fuzzing"]
+
+# Prevent this from interfering with workspaces
+[workspace]
+members = ["."]
+
+[profile.release]
+debug = 1
+
+[[bin]]
+name = "fuzz_append"
+path = "fuzz_targets/fuzz_append.rs"
+test = false
+doc = false
diff --git a/substrate/primitives/state-machine/fuzz/fuzz_targets/fuzz_append.rs b/substrate/primitives/state-machine/fuzz/fuzz_targets/fuzz_append.rs
new file mode 100644
index 0000000000000000000000000000000000000000..44847f535655f61a7b40bf2d8d08484abbdccddb
--- /dev/null
+++ b/substrate/primitives/state-machine/fuzz/fuzz_targets/fuzz_append.rs
@@ -0,0 +1,26 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#![no_main]
+
+use libfuzzer_sys::fuzz_target;
+use sp_state_machine::fuzzing::{fuzz_append, FuzzAppendPayload};
+use sp_runtime::traits::BlakeTwo256;
+
+fuzz_target!(|data: FuzzAppendPayload| {
+	fuzz_append::<BlakeTwo256>(data);
+});
diff --git a/substrate/primitives/state-machine/src/basic.rs b/substrate/primitives/state-machine/src/basic.rs
index 8b6f746eaba0af9438557dc7dbe118bfbc4cdf34..6201d60ababd27d1e212791a03e62b7915be2c98 100644
--- a/substrate/primitives/state-machine/src/basic.rs
+++ b/substrate/primitives/state-machine/src/basic.rs
@@ -59,16 +59,17 @@ impl BasicExternalities {
 	}
 
 	/// Consume self and returns inner storages
-	pub fn into_storages(self) -> Storage {
+	#[cfg(feature = "std")]
+	pub fn into_storages(mut self) -> Storage {
 		Storage {
 			top: self
 				.overlay
-				.changes()
+				.changes_mut()
 				.filter_map(|(k, v)| v.value().map(|v| (k.to_vec(), v.to_vec())))
 				.collect(),
 			children_default: self
 				.overlay
-				.children()
+				.children_mut()
 				.map(|(iter, i)| {
 					(
 						i.storage_key().to_vec(),
@@ -87,6 +88,7 @@ impl BasicExternalities {
 	/// Execute the given closure `f` with the externalities set and initialized with `storage`.
 	///
 	/// Returns the result of the closure and updates `storage` with all changes.
+	#[cfg(feature = "std")]
 	pub fn execute_with_storage<R>(
 		storage: &mut sp_core::storage::Storage,
 		f: impl FnOnce() -> R,
@@ -118,19 +120,37 @@ impl BasicExternalities {
 	}
 }
 
+#[cfg(test)]
 impl PartialEq for BasicExternalities {
-	fn eq(&self, other: &BasicExternalities) -> bool {
-		self.overlay.changes().map(|(k, v)| (k, v.value())).collect::<BTreeMap<_, _>>() ==
-			other.overlay.changes().map(|(k, v)| (k, v.value())).collect::<BTreeMap<_, _>>() &&
+	fn eq(&self, other: &Self) -> bool {
+		self.overlay
+			.changes()
+			.map(|(k, v)| (k, v.value_ref().materialize()))
+			.collect::<BTreeMap<_, _>>() ==
+			other
+				.overlay
+				.changes()
+				.map(|(k, v)| (k, v.value_ref().materialize()))
+				.collect::<BTreeMap<_, _>>() &&
 			self.overlay
 				.children()
-				.map(|(iter, i)| (i, iter.map(|(k, v)| (k, v.value())).collect::<BTreeMap<_, _>>()))
+				.map(|(iter, i)| {
+					(
+						i,
+						iter.map(|(k, v)| (k, v.value_ref().materialize()))
+							.collect::<BTreeMap<_, _>>(),
+					)
+				})
 				.collect::<BTreeMap<_, _>>() ==
 				other
 					.overlay
 					.children()
 					.map(|(iter, i)| {
-						(i, iter.map(|(k, v)| (k, v.value())).collect::<BTreeMap<_, _>>())
+						(
+							i,
+							iter.map(|(k, v)| (k, v.value_ref().materialize()))
+								.collect::<BTreeMap<_, _>>(),
+						)
 					})
 					.collect::<BTreeMap<_, _>>()
 	}
@@ -159,27 +179,27 @@ impl From<BTreeMap<StorageKey, StorageValue>> for BasicExternalities {
 impl Externalities for BasicExternalities {
 	fn set_offchain_storage(&mut self, _key: &[u8], _value: Option<&[u8]>) {}
 
-	fn storage(&self, key: &[u8]) -> Option<StorageValue> {
+	fn storage(&mut self, key: &[u8]) -> Option<StorageValue> {
 		self.overlay.storage(key).and_then(|v| v.map(|v| v.to_vec()))
 	}
 
-	fn storage_hash(&self, key: &[u8]) -> Option<Vec<u8>> {
+	fn storage_hash(&mut self, key: &[u8]) -> Option<Vec<u8>> {
 		self.storage(key).map(|v| Blake2Hasher::hash(&v).encode())
 	}
 
-	fn child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
+	fn child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
 		self.overlay.child_storage(child_info, key).and_then(|v| v.map(|v| v.to_vec()))
 	}
 
-	fn child_storage_hash(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
+	fn child_storage_hash(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
 		self.child_storage(child_info, key).map(|v| Blake2Hasher::hash(&v).encode())
 	}
 
-	fn next_storage_key(&self, key: &[u8]) -> Option<StorageKey> {
+	fn next_storage_key(&mut self, key: &[u8]) -> Option<StorageKey> {
 		self.overlay.iter_after(key).find_map(|(k, v)| v.value().map(|_| k.to_vec()))
 	}
 
-	fn next_child_storage_key(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
+	fn next_child_storage_key(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
 		self.overlay
 			.child_iter_after(child_info.storage_key(), key)
 			.find_map(|(k, v)| v.value().map(|_| k.to_vec()))
@@ -243,15 +263,14 @@ impl Externalities for BasicExternalities {
 		MultiRemovalResults { maybe_cursor: None, backend: count, unique: count, loops: count }
 	}
 
-	fn storage_append(&mut self, key: Vec<u8>, value: Vec<u8>) {
-		let current_value = self.overlay.value_mut_or_insert_with(&key, || Default::default());
-		crate::ext::StorageAppend::new(current_value).append(value);
+	fn storage_append(&mut self, key: Vec<u8>, element: Vec<u8>) {
+		self.overlay.append_storage(key, element, Default::default);
 	}
 
 	fn storage_root(&mut self, state_version: StateVersion) -> Vec<u8> {
 		let mut top = self
 			.overlay
-			.changes()
+			.changes_mut()
 			.filter_map(|(k, v)| v.value().map(|v| (k.clone(), v.clone())))
 			.collect::<BTreeMap<_, _>>();
 		// Single child trie implementation currently allows using the same child
@@ -278,7 +297,7 @@ impl Externalities for BasicExternalities {
 		child_info: &ChildInfo,
 		state_version: StateVersion,
 	) -> Vec<u8> {
-		if let Some((data, child_info)) = self.overlay.child_changes(child_info.storage_key()) {
+		if let Some((data, child_info)) = self.overlay.child_changes_mut(child_info.storage_key()) {
 			let delta =
 				data.into_iter().map(|(k, v)| (k.as_ref(), v.value().map(|v| v.as_slice())));
 			crate::in_memory_backend::new_in_mem::<Blake2Hasher>()
diff --git a/substrate/primitives/state-machine/src/ext.rs b/substrate/primitives/state-machine/src/ext.rs
index 9aa32bc866cfab9e2db44959c93bc4fe0b11e120..7a79c4e8a1f1bbd51fbdb34f5b91d2427bdd2995 100644
--- a/substrate/primitives/state-machine/src/ext.rs
+++ b/substrate/primitives/state-machine/src/ext.rs
@@ -22,7 +22,7 @@ use crate::overlayed_changes::OverlayedExtensions;
 use crate::{
 	backend::Backend, IndexOperation, IterArgs, OverlayedChanges, StorageKey, StorageValue,
 };
-use codec::{Encode, EncodeAppend};
+use codec::{Compact, CompactLen, Decode, Encode};
 use hash_db::Hasher;
 #[cfg(feature = "std")]
 use sp_core::hexdisplay::HexDisplay;
@@ -31,8 +31,8 @@ use sp_core::storage::{
 };
 use sp_externalities::{Extension, ExtensionStore, Externalities, MultiRemovalResults};
 
-use crate::{log_error, trace, warn};
-use alloc::{boxed::Box, vec, vec::Vec};
+use crate::{trace, warn};
+use alloc::{boxed::Box, vec::Vec};
 use core::{
 	any::{Any, TypeId},
 	cmp::Ordering,
@@ -139,7 +139,7 @@ where
 	H::Out: Ord + 'static,
 	B: 'a + Backend<H>,
 {
-	pub fn storage_pairs(&self) -> Vec<(StorageKey, StorageValue)> {
+	pub fn storage_pairs(&mut self) -> Vec<(StorageKey, StorageValue)> {
 		use std::collections::HashMap;
 
 		self.backend
@@ -147,7 +147,7 @@ where
 			.expect("never fails in tests; qed.")
 			.map(|key_value| key_value.expect("never fails in tests; qed."))
 			.map(|(k, v)| (k, Some(v)))
-			.chain(self.overlay.changes().map(|(k, v)| (k.clone(), v.value().cloned())))
+			.chain(self.overlay.changes_mut().map(|(k, v)| (k.clone(), v.value().cloned())))
 			.collect::<HashMap<_, _>>()
 			.into_iter()
 			.filter_map(|(k, maybe_val)| maybe_val.map(|val| (k, val)))
@@ -165,7 +165,7 @@ where
 		self.overlay.set_offchain_storage(key, value)
 	}
 
-	fn storage(&self, key: &[u8]) -> Option<StorageValue> {
+	fn storage(&mut self, key: &[u8]) -> Option<StorageValue> {
 		let _guard = guard();
 		let result = self
 			.overlay
@@ -191,7 +191,7 @@ where
 		result
 	}
 
-	fn storage_hash(&self, key: &[u8]) -> Option<Vec<u8>> {
+	fn storage_hash(&mut self, key: &[u8]) -> Option<Vec<u8>> {
 		let _guard = guard();
 		let result = self
 			.overlay
@@ -209,7 +209,7 @@ where
 		result.map(|r| r.encode())
 	}
 
-	fn child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
+	fn child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
 		let _guard = guard();
 		let result = self
 			.overlay
@@ -231,7 +231,7 @@ where
 		result
 	}
 
-	fn child_storage_hash(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
+	fn child_storage_hash(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
 		let _guard = guard();
 		let result = self
 			.overlay
@@ -253,7 +253,7 @@ where
 		result.map(|r| r.encode())
 	}
 
-	fn exists_storage(&self, key: &[u8]) -> bool {
+	fn exists_storage(&mut self, key: &[u8]) -> bool {
 		let _guard = guard();
 		let result = match self.overlay.storage(key) {
 			Some(x) => x.is_some(),
@@ -271,7 +271,7 @@ where
 		result
 	}
 
-	fn exists_child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> bool {
+	fn exists_child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> bool {
 		let _guard = guard();
 
 		let result = match self.overlay.child_storage(child_info, key) {
@@ -293,7 +293,7 @@ where
 		result
 	}
 
-	fn next_storage_key(&self, key: &[u8]) -> Option<StorageKey> {
+	fn next_storage_key(&mut self, key: &[u8]) -> Option<StorageKey> {
 		let mut next_backend_key =
 			self.backend.next_storage_key(key).expect(EXT_NOT_ALLOWED_TO_FAIL);
 		let mut overlay_changes = self.overlay.iter_after(key).peekable();
@@ -331,7 +331,7 @@ where
 		}
 	}
 
-	fn next_child_storage_key(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
+	fn next_child_storage_key(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
 		let mut next_backend_key = self
 			.backend
 			.next_child_storage_key(child_info, key)
@@ -501,10 +501,9 @@ where
 		let _guard = guard();
 
 		let backend = &mut self.backend;
-		let current_value = self.overlay.value_mut_or_insert_with(&key, || {
+		self.overlay.append_storage(key.clone(), value, || {
 			backend.storage(&key).expect(EXT_NOT_ALLOWED_TO_FAIL).unwrap_or_default()
 		});
-		StorageAppend::new(current_value).append(value);
 	}
 
 	fn storage_root(&mut self, state_version: StateVersion) -> Vec<u8> {
@@ -731,10 +730,27 @@ impl<'a> StorageAppend<'a> {
 		Self(storage)
 	}
 
+	/// Extract the length of the list like data structure.
+	pub fn extract_length(&self) -> Option<u32> {
+		Compact::<u32>::decode(&mut &self.0[..]).map(|c| c.0).ok()
+	}
+
+	/// Replace the length in the encoded data.
+	///
+	/// If `old_length` is `None`, the previous length will be assumed to be `0`.
+	pub fn replace_length(&mut self, old_length: Option<u32>, new_length: u32) {
+		let old_len_encoded_len = old_length.map(|l| Compact::<u32>::compact_len(&l)).unwrap_or(0);
+		let new_len_encoded = Compact::<u32>(new_length).encode();
+		self.0.splice(0..old_len_encoded_len, new_len_encoded);
+	}
+
 	/// Append the given `value` to the storage item.
 	///
-	/// If appending fails, `[value]` is stored in the storage item.
-	pub fn append(&mut self, value: Vec<u8>) {
+	/// If appending fails, `[value]` is stored in the storage item and we return false.
+	#[cfg(any(test, feature = "fuzzing"))]
+	pub fn append(&mut self, value: Vec<u8>) -> bool {
+		use codec::EncodeAppend;
+		let mut result = true;
 		let value = vec![EncodeOpaqueValue(value)];
 
 		let item = core::mem::take(self.0);
@@ -742,13 +758,20 @@ impl<'a> StorageAppend<'a> {
 		*self.0 = match Vec::<EncodeOpaqueValue>::append_or_new(item, &value) {
 			Ok(item) => item,
 			Err(_) => {
-				log_error!(
+				result = false;
+				crate::log_error!(
 					target: "runtime",
 					"Failed to append value, resetting storage item to `[value]`.",
 				);
 				value.encode()
 			},
 		};
+		result
+	}
+
+	/// Append to current buffer, do not touch the prefixed length.
+	pub fn append_raw(&mut self, mut value: Vec<u8>) {
+		self.0.append(&mut value)
 	}
 }
 
@@ -849,7 +872,7 @@ mod tests {
 		)
 			.into();
 
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		// next_backend < next_overlay
 		assert_eq!(ext.next_storage_key(&[5]), Some(vec![10]));
@@ -865,7 +888,7 @@ mod tests {
 
 		drop(ext);
 		overlay.set_storage(vec![50], Some(vec![50]));
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		// next_overlay exist but next_backend doesn't exist
 		assert_eq!(ext.next_storage_key(&[40]), Some(vec![50]));
@@ -895,7 +918,7 @@ mod tests {
 		)
 			.into();
 
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		assert_eq!(ext.next_storage_key(&[5]), Some(vec![30]));
 
@@ -928,7 +951,7 @@ mod tests {
 		)
 			.into();
 
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		// next_backend < next_overlay
 		assert_eq!(ext.next_child_storage_key(child_info, &[5]), Some(vec![10]));
@@ -944,7 +967,7 @@ mod tests {
 
 		drop(ext);
 		overlay.set_child_storage(child_info, vec![50], Some(vec![50]));
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		// next_overlay exist but next_backend doesn't exist
 		assert_eq!(ext.next_child_storage_key(child_info, &[40]), Some(vec![50]));
@@ -975,7 +998,7 @@ mod tests {
 		)
 			.into();
 
-		let ext = TestExt::new(&mut overlay, &backend, None);
+		let mut ext = TestExt::new(&mut overlay, &backend, None);
 
 		assert_eq!(ext.child_storage(child_info, &[10]), Some(vec![10]));
 		assert_eq!(
diff --git a/substrate/primitives/state-machine/src/fuzzing.rs b/substrate/primitives/state-machine/src/fuzzing.rs
new file mode 100644
index 0000000000000000000000000000000000000000..e147e6e88003cf720d4f6062c392630602210388
--- /dev/null
+++ b/substrate/primitives/state-machine/src/fuzzing.rs
@@ -0,0 +1,319 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// 	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! State machine fuzzing implementation, behind `fuzzing` feature.
+
+use super::{ext::Ext, *};
+use crate::ext::StorageAppend;
+use arbitrary::Arbitrary;
+#[cfg(test)]
+use codec::Encode;
+use hash_db::Hasher;
+use sp_core::{storage::StateVersion, traits::Externalities};
+#[cfg(test)]
+use sp_runtime::traits::BlakeTwo256;
+use sp_trie::PrefixedMemoryDB;
+use std::collections::BTreeMap;
+
+#[derive(Arbitrary, Debug, Clone)]
+enum DataLength {
+	Zero = 0,
+	Small = 1,
+	Medium = 3,
+	Big = 300, // 2 byte scale encode length
+}
+
+#[derive(Arbitrary, Debug, Clone)]
+#[repr(u8)]
+enum DataValue {
+	A = b'a',
+	B = b'b',
+	C = b'c',
+	D = b'd',       // This can be read as a multiple byte compact length.
+	EasyBug = 20u8, // value compact len.
+}
+
+/// Action to fuzz
+#[derive(Arbitrary, Debug, Clone)]
+enum FuzzAppendItem {
+	Append(DataValue, DataLength),
+	Insert(DataValue, DataLength),
+	StartTransaction,
+	RollbackTransaction,
+	CommitTransaction,
+	Read,
+	Remove,
+	// To go over 256 items easily (different compact size then).
+	Append50(DataValue, DataLength),
+}
+
+/// Arbitrary payload for fuzzing append.
+#[derive(Arbitrary, Debug, Clone)]
+pub struct FuzzAppendPayload(Vec<FuzzAppendItem>, Option<(DataValue, DataLength)>);
+
+struct SimpleOverlay {
+	data: Vec<BTreeMap<Vec<u8>, Option<Vec<u8>>>>,
+}
+
+impl Default for SimpleOverlay {
+	fn default() -> Self {
+		Self { data: vec![BTreeMap::new()] }
+	}
+}
+
+impl SimpleOverlay {
+	fn insert(&mut self, key: Vec<u8>, value: Option<Vec<u8>>) {
+		self.data.last_mut().expect("always at least one item").insert(key, value);
+	}
+
+	fn append<H>(
+		&mut self,
+		key: Vec<u8>,
+		value: Vec<u8>,
+		backend: &mut TrieBackend<PrefixedMemoryDB<H>, H>,
+	) where
+		H: Hasher,
+		H::Out: codec::Decode + codec::Encode + 'static,
+	{
+		let current_value = self
+			.data
+			.last_mut()
+			.expect("always at least one item")
+			.entry(key.clone())
+			.or_insert_with(|| {
+				Some(backend.storage(&key).expect("Ext not allowed to fail").unwrap_or_default())
+			});
+		if current_value.is_none() {
+			*current_value = Some(vec![]);
+		}
+		StorageAppend::new(current_value.as_mut().expect("init above")).append(value);
+	}
+
+	fn get(&mut self, key: &[u8]) -> Option<&Vec<u8>> {
+		self.data
+			.last_mut()
+			.expect("always at least one item")
+			.get(key)
+			.and_then(|o| o.as_ref())
+	}
+
+	fn commit_transaction(&mut self) {
+		if let Some(to_commit) = self.data.pop() {
+			let dest = self.data.last_mut().expect("always at least one item");
+			for (k, v) in to_commit.into_iter() {
+				dest.insert(k, v);
+			}
+		}
+	}
+
+	fn rollback_transaction(&mut self) {
+		let _ = self.data.pop();
+	}
+
+	fn start_transaction(&mut self) {
+		let cloned = self.data.last().expect("always at least one item").clone();
+		self.data.push(cloned);
+	}
+}
+
+struct FuzzAppendState<H: Hasher> {
+	key: Vec<u8>,
+
+	// reference simple implementation
+	reference: SimpleOverlay,
+
+	// trie backend
+	backend: TrieBackend<PrefixedMemoryDB<H>, H>,
+	// Standard Overlay
+	overlay: OverlayedChanges<H>,
+
+	// block dropping/commiting too many transaction
+	transaction_depth: usize,
+}
+
+impl<H> FuzzAppendState<H>
+where
+	H: Hasher,
+	H::Out: codec::Decode + codec::Encode + 'static,
+{
+	fn process_item(&mut self, item: FuzzAppendItem) {
+		let mut ext = Ext::new(&mut self.overlay, &mut self.backend, None);
+		match item {
+			FuzzAppendItem::Append(value, length) => {
+				let value = vec![value as u8; length as usize];
+				ext.storage_append(self.key.clone(), value.clone());
+				self.reference.append(self.key.clone(), value, &mut self.backend);
+			},
+			FuzzAppendItem::Append50(value, length) => {
+				let value = vec![value as u8; length as usize];
+				for _ in 0..50 {
+					let mut ext = Ext::new(&mut self.overlay, &mut self.backend, None);
+					ext.storage_append(self.key.clone(), value.clone());
+					self.reference.append(self.key.clone(), value.clone(), &mut self.backend);
+				}
+			},
+			FuzzAppendItem::Insert(value, length) => {
+				let value = vec![value as u8; length as usize];
+				ext.set_storage(self.key.clone(), value.clone());
+				self.reference.insert(self.key.clone(), Some(value));
+			},
+			FuzzAppendItem::Remove => {
+				ext.clear_storage(&self.key);
+				self.reference.insert(self.key.clone(), None);
+			},
+			FuzzAppendItem::Read => {
+				let left = ext.storage(self.key.as_slice());
+				let right = self.reference.get(self.key.as_slice());
+				assert_eq!(left.as_ref(), right);
+			},
+			FuzzAppendItem::StartTransaction => {
+				self.transaction_depth += 1;
+				self.reference.start_transaction();
+				ext.storage_start_transaction();
+			},
+			FuzzAppendItem::RollbackTransaction => {
+				if self.transaction_depth == 0 {
+					return
+				}
+				self.transaction_depth -= 1;
+				self.reference.rollback_transaction();
+				ext.storage_rollback_transaction().unwrap();
+			},
+			FuzzAppendItem::CommitTransaction => {
+				if self.transaction_depth == 0 {
+					return
+				}
+				self.transaction_depth -= 1;
+				self.reference.commit_transaction();
+				ext.storage_commit_transaction().unwrap();
+			},
+		}
+	}
+
+	fn check_final_state(&mut self) {
+		let mut ext = Ext::new(&mut self.overlay, &mut self.backend, None);
+		let left = ext.storage(self.key.as_slice());
+		let right = self.reference.get(self.key.as_slice());
+		assert_eq!(left.as_ref(), right);
+	}
+}
+
+#[test]
+fn fuzz_scenarii() {
+	assert_eq!(codec::Compact(5u16).encode()[0], DataValue::EasyBug as u8);
+	let scenarii = vec![
+		(
+			vec![
+				FuzzAppendItem::Append(DataValue::A, DataLength::Small),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append50(DataValue::D, DataLength::Small),
+				FuzzAppendItem::Read,
+				FuzzAppendItem::RollbackTransaction,
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::D, DataLength::Small),
+				FuzzAppendItem::Read,
+				FuzzAppendItem::RollbackTransaction,
+			],
+			Some((DataValue::D, DataLength::Small)),
+		),
+		(
+			vec![
+				FuzzAppendItem::Append(DataValue::B, DataLength::Small),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Small),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Remove,
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Zero),
+				FuzzAppendItem::CommitTransaction,
+				FuzzAppendItem::CommitTransaction,
+				FuzzAppendItem::Remove,
+			],
+			Some((DataValue::EasyBug, DataLength::Small)),
+		),
+		(
+			vec![
+				FuzzAppendItem::Append(DataValue::A, DataLength::Small),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Medium),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Remove,
+				FuzzAppendItem::CommitTransaction,
+				FuzzAppendItem::RollbackTransaction,
+			],
+			Some((DataValue::B, DataLength::Big)),
+		),
+		(
+			vec![
+				FuzzAppendItem::Append(DataValue::A, DataLength::Big),
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Medium),
+				FuzzAppendItem::Remove,
+				FuzzAppendItem::RollbackTransaction,
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Zero),
+			],
+			None,
+		),
+		(
+			vec![
+				FuzzAppendItem::StartTransaction,
+				FuzzAppendItem::RollbackTransaction,
+				FuzzAppendItem::RollbackTransaction,
+				FuzzAppendItem::Append(DataValue::A, DataLength::Zero),
+			],
+			None,
+		),
+		(vec![FuzzAppendItem::StartTransaction], Some((DataValue::EasyBug, DataLength::Zero))),
+	];
+
+	for (scenario, init) in scenarii.into_iter() {
+		fuzz_append::<BlakeTwo256>(FuzzAppendPayload(scenario, init));
+	}
+}
+
+/// Test append operation for a given fuzzing payload.
+pub fn fuzz_append<H>(payload: FuzzAppendPayload)
+where
+	H: Hasher,
+	H::Out: codec::Decode + codec::Encode + 'static,
+{
+	let FuzzAppendPayload(to_fuzz, initial) = payload;
+	let key = b"k".to_vec();
+	let mut reference = SimpleOverlay::default();
+	let initial: BTreeMap<_, _> = initial
+		.into_iter()
+		.map(|(v, l)| (key.clone(), vec![v as u8; l as usize]))
+		.collect();
+	for (k, v) in initial.iter() {
+		reference.data[0].insert(k.clone(), Some(v.clone()));
+	}
+	reference.start_transaction(); // level 0 is backend, keep it untouched.
+	let overlay = OverlayedChanges::default();
+
+	let mut state = FuzzAppendState::<H> {
+		key,
+		reference,
+		overlay,
+		backend: (initial, StateVersion::default()).into(),
+		transaction_depth: 0,
+	};
+	for item in to_fuzz {
+		state.process_item(item);
+	}
+	state.check_final_state();
+}
diff --git a/substrate/primitives/state-machine/src/in_memory_backend.rs b/substrate/primitives/state-machine/src/in_memory_backend.rs
index 06fe6d4162a7f4c920c8f09c2e4e8b05a3c72c03..7ba7457a6bf18bfb17d3faa7c6d3aaea6cd36788 100644
--- a/substrate/primitives/state-machine/src/in_memory_backend.rs
+++ b/substrate/primitives/state-machine/src/in_memory_backend.rs
@@ -132,6 +132,7 @@ where
 	}
 }
 
+#[cfg(feature = "std")]
 impl<H: Hasher> From<(Storage, StateVersion)> for TrieBackend<PrefixedMemoryDB<H>, H>
 where
 	H::Out: Codec + Ord,
diff --git a/substrate/primitives/state-machine/src/lib.rs b/substrate/primitives/state-machine/src/lib.rs
index 13087431d387b9e986519bbdcabfe9eb3c4fe5c5..289b08755f680605a28a16adabac3cb3698a83fc 100644
--- a/substrate/primitives/state-machine/src/lib.rs
+++ b/substrate/primitives/state-machine/src/lib.rs
@@ -27,6 +27,8 @@ pub mod backend;
 mod basic;
 mod error;
 mod ext;
+#[cfg(feature = "fuzzing")]
+pub mod fuzzing;
 #[cfg(feature = "std")]
 mod in_memory_backend;
 pub(crate) mod overlayed_changes;
@@ -1273,7 +1275,7 @@ mod tests {
 
 		assert_eq!(
 			overlay
-				.changes()
+				.changes_mut()
 				.map(|(k, v)| (k.clone(), v.value().cloned()))
 				.collect::<HashMap<_, _>>(),
 			map![
@@ -1299,7 +1301,7 @@ mod tests {
 
 		assert_eq!(
 			overlay
-				.changes()
+				.changes_mut()
 				.map(|(k, v)| (k.clone(), v.value().cloned()))
 				.collect::<HashMap<_, _>>(),
 			map![
@@ -1340,7 +1342,7 @@ mod tests {
 
 		assert_eq!(
 			overlay
-				.children()
+				.children_mut()
 				.flat_map(|(iter, _child_info)| iter)
 				.map(|(k, v)| (k.clone(), v.value()))
 				.collect::<BTreeMap<_, _>>(),
@@ -1440,11 +1442,78 @@ mod tests {
 		}
 		overlay.rollback_transaction().unwrap();
 		{
-			let ext = Ext::new(&mut overlay, backend, None);
+			let mut ext = Ext::new(&mut overlay, backend, None);
 			assert_eq!(ext.storage(key.as_slice()), Some(vec![reference_data[0].clone()].encode()));
 		}
 	}
 
+	// Test that we can append twice to a key, then perform a remove operation.
+	// The test checks specifically that the append is merged with its parent transaction
+	// on commit.
+	#[test]
+	fn commit_merges_append_with_parent() {
+		#[derive(codec::Encode, codec::Decode)]
+		enum Item {
+			Item1,
+			Item2,
+		}
+
+		let key = b"events".to_vec();
+		let state = new_in_mem::<BlakeTwo256>();
+		let backend = state.as_trie_backend();
+		let mut overlay = OverlayedChanges::default();
+
+		// Append first item
+		overlay.start_transaction();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+			ext.clear_storage(key.as_slice());
+			ext.storage_append(key.clone(), Item::Item1.encode());
+		}
+
+		// Append second item
+		overlay.start_transaction();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+
+			assert_eq!(ext.storage(key.as_slice()), Some(vec![Item::Item1].encode()));
+
+			ext.storage_append(key.clone(), Item::Item2.encode());
+
+			assert_eq!(ext.storage(key.as_slice()), Some(vec![Item::Item1, Item::Item2].encode()),);
+		}
+
+		// Remove item
+		overlay.start_transaction();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+
+			ext.place_storage(key.clone(), None);
+
+			assert_eq!(ext.storage(key.as_slice()), None);
+		}
+
+		// Remove gets commited and merged into previous transaction
+		overlay.commit_transaction().unwrap();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+			assert_eq!(ext.storage(key.as_slice()), None,);
+		}
+
+		// Remove gets rolled back, we should see the initial append again.
+		overlay.rollback_transaction().unwrap();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+			assert_eq!(ext.storage(key.as_slice()), Some(vec![Item::Item1].encode()));
+		}
+
+		overlay.commit_transaction().unwrap();
+		{
+			let mut ext = Ext::new(&mut overlay, backend, None);
+			assert_eq!(ext.storage(key.as_slice()), Some(vec![Item::Item1].encode()));
+		}
+	}
+
 	#[test]
 	fn remove_with_append_then_rollback_appended_then_append_again() {
 		#[derive(codec::Encode, codec::Decode)]
@@ -1499,7 +1568,7 @@ mod tests {
 
 		// Then only initialization item and second (committed) item should persist.
 		{
-			let ext = Ext::new(&mut overlay, backend, None);
+			let mut ext = Ext::new(&mut overlay, backend, None);
 			assert_eq!(
 				ext.storage(key.as_slice()),
 				Some(vec![Item::InitializationItem, Item::CommittedItem].encode()),
diff --git a/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs b/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs
index 601bc2e29198561d501e1f20b11c2323c0064989..c478983e979af440a409199934c641eb01492675 100644
--- a/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs
+++ b/substrate/primitives/state-machine/src/overlayed_changes/changeset.rs
@@ -21,11 +21,15 @@ use super::{Extrinsics, StorageKey, StorageValue};
 
 #[cfg(not(feature = "std"))]
 use alloc::collections::btree_set::BTreeSet as Set;
+use codec::{Compact, CompactLen};
 #[cfg(feature = "std")]
 use std::collections::HashSet as Set;
 
-use crate::warn;
-use alloc::collections::{btree_map::BTreeMap, btree_set::BTreeSet};
+use crate::{ext::StorageAppend, warn};
+use alloc::{
+	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
+	vec::Vec,
+};
 use core::hash::Hash;
 use smallvec::SmallVec;
 
@@ -86,10 +90,97 @@ impl<V> Default for OverlayedEntry<V> {
 }
 
 /// History of value, with removal support.
-pub type OverlayedValue = OverlayedEntry<Option<StorageValue>>;
+pub type OverlayedValue = OverlayedEntry<StorageEntry>;
+
+/// Content in an overlay for a given transactional depth.
+#[derive(Debug, Clone, Default)]
+#[cfg_attr(test, derive(PartialEq))]
+pub enum StorageEntry {
+	/// The storage entry should be set to the stored value.
+	Set(StorageValue),
+	/// The storage entry should be removed.
+	#[default]
+	Remove,
+	/// The storage entry was appended to.
+	///
+	/// This assumes that the storage entry is encoded as a SCALE list. This means that it is
+	/// prefixed with a `Compact<u32>` that reprensents the length, followed by all the encoded
+	/// elements.
+	Append {
+		/// The value of the storage entry.
+		///
+		/// This may or may not be prefixed by the length, depending on the materialized length.
+		data: StorageValue,
+		/// Current number of elements stored in data.
+		current_length: u32,
+		/// The number of elements as stored in the prefixed length in `data`.
+		///
+		/// If `None`, than `data` is not yet prefixed with the length.
+		materialized_length: Option<u32>,
+		/// The size of `data` in the parent transactional layer.
+		///
+		/// Only set when the parent layer is in  `Append` state.
+		parent_size: Option<usize>,
+	},
+}
+
+impl StorageEntry {
+	/// Convert to an [`Option<StorageValue>`].
+	pub(super) fn to_option(mut self) -> Option<StorageValue> {
+		self.materialize_in_place();
+		match self {
+			StorageEntry::Append { data, .. } | StorageEntry::Set(data) => Some(data),
+			StorageEntry::Remove => None,
+		}
+	}
+
+	/// Return as an [`Option<StorageValue>`].
+	fn as_option(&mut self) -> Option<&StorageValue> {
+		self.materialize_in_place();
+		match self {
+			StorageEntry::Append { data, .. } | StorageEntry::Set(data) => Some(data),
+			StorageEntry::Remove => None,
+		}
+	}
+
+	/// Materialize the internal state and cache the resulting materialized value.
+	fn materialize_in_place(&mut self) {
+		if let StorageEntry::Append { data, materialized_length, current_length, .. } = self {
+			let current_length = *current_length;
+			if materialized_length.map_or(false, |m| m == current_length) {
+				return
+			}
+			StorageAppend::new(data).replace_length(*materialized_length, current_length);
+			*materialized_length = Some(current_length);
+		}
+	}
+
+	/// Materialize the internal state.
+	#[cfg(test)]
+	pub(crate) fn materialize(&self) -> Option<alloc::borrow::Cow<[u8]>> {
+		use alloc::borrow::Cow;
+
+		match self {
+			StorageEntry::Append { data, materialized_length, current_length, .. } => {
+				let current_length = *current_length;
+				if materialized_length.map_or(false, |m| m == current_length) {
+					Some(Cow::Borrowed(data.as_ref()))
+				} else {
+					let mut data = data.clone();
+					StorageAppend::new(&mut data)
+						.replace_length(*materialized_length, current_length);
+
+					Some(data.into())
+				}
+			},
+			StorageEntry::Remove => None,
+			StorageEntry::Set(e) => Some(Cow::Borrowed(e.as_ref())),
+		}
+	}
+}
 
 /// Change set for basic key value with extrinsics index recording and removal support.
-pub type OverlayedChangeSet = OverlayedMap<StorageKey, Option<StorageValue>>;
+pub type OverlayedChangeSet = OverlayedMap<StorageKey, StorageEntry>;
 
 /// Holds a set of changes with the ability modify them using nested transactions.
 #[derive(Debug, Clone)]
@@ -120,7 +211,7 @@ impl<K, V> Default for OverlayedMap<K, V> {
 }
 
 #[cfg(feature = "std")]
-impl From<sp_core::storage::StorageMap> for OverlayedMap<StorageKey, Option<StorageValue>> {
+impl From<sp_core::storage::StorageMap> for OverlayedMap<StorageKey, StorageEntry> {
 	fn from(storage: sp_core::storage::StorageMap) -> Self {
 		Self {
 			changes: storage
@@ -130,7 +221,7 @@ impl From<sp_core::storage::StorageMap> for OverlayedMap<StorageKey, Option<Stor
 						k,
 						OverlayedEntry {
 							transactions: SmallVec::from_iter([InnerValue {
-								value: Some(v),
+								value: StorageEntry::Set(v),
 								extrinsics: Default::default(),
 							}]),
 						},
@@ -189,7 +280,7 @@ impl<V> OverlayedEntry<V> {
 	///
 	/// This makes sure that the old version is not overwritten and can be properly
 	/// rolled back when required.
-	fn set(&mut self, value: V, first_write_in_tx: bool, at_extrinsic: Option<u32>) {
+	fn set_offchain(&mut self, value: V, first_write_in_tx: bool, at_extrinsic: Option<u32>) {
 		if first_write_in_tx || self.transactions.is_empty() {
 			self.transactions.push(InnerValue { value, extrinsics: Default::default() });
 		} else {
@@ -202,10 +293,223 @@ impl<V> OverlayedEntry<V> {
 	}
 }
 
-impl OverlayedEntry<Option<StorageValue>> {
+/// Restore the `current_data` from an [`StorageEntry::Append`] back to the parent.
+///
+/// When creating a new transaction layer from an appended entry, the `data` will be moved to
+/// prevent extra allocations. So, we need to move back the `data` to the parent layer when there is
+/// a roll back or the entry is set to some different value. This functions puts back the data to
+/// the `parent` and truncates any extra elements that got added in the current layer.
+///
+/// The current and the `parent` layer need to be [`StorageEntry::Append`] or otherwise the function
+/// is a no-op.
+fn restore_append_to_parent(
+	parent: &mut StorageEntry,
+	mut current_data: Vec<u8>,
+	current_materialized: Option<u32>,
+	mut target_parent_size: usize,
+) {
+	match parent {
+		StorageEntry::Append {
+			data: parent_data,
+			materialized_length: parent_materialized,
+			..
+		} => {
+			// Forward the materialized length to the parent with the data. Next time when
+			// materializing the value, the length will be corrected. This prevents doing a
+			// potential allocation here.
+
+			let prev = parent_materialized.map(|l| Compact::<u32>::compact_len(&l)).unwrap_or(0);
+			let new = current_materialized.map(|l| Compact::<u32>::compact_len(&l)).unwrap_or(0);
+			let delta = new.abs_diff(prev);
+			if prev >= new {
+				target_parent_size -= delta;
+			} else {
+				target_parent_size += delta;
+			}
+			*parent_materialized = current_materialized;
+
+			// Truncate the data to remove any extra elements
+			current_data.truncate(target_parent_size);
+			*parent_data = current_data;
+		},
+		_ => {
+			// No value or a simple value, no need to restore
+		},
+	}
+}
+
+impl OverlayedEntry<StorageEntry> {
+	/// Writes a new version of a value.
+	///
+	/// This makes sure that the old version is not overwritten and can be properly
+	/// rolled back when required.
+	fn set(
+		&mut self,
+		value: Option<StorageValue>,
+		first_write_in_tx: bool,
+		at_extrinsic: Option<u32>,
+	) {
+		let value = value.map_or_else(|| StorageEntry::Remove, StorageEntry::Set);
+
+		if first_write_in_tx || self.transactions.is_empty() {
+			self.transactions.push(InnerValue { value, extrinsics: Default::default() });
+		} else {
+			let mut old_value = self.value_mut();
+
+			let set_prev = if let StorageEntry::Append {
+				data,
+				current_length: _,
+				materialized_length,
+				parent_size,
+			} = &mut old_value
+			{
+				parent_size
+					.map(|parent_size| (core::mem::take(data), *materialized_length, parent_size))
+			} else {
+				None
+			};
+
+			*old_value = value;
+
+			if let Some((data, current_materialized, parent_size)) = set_prev {
+				let transactions = self.transactions.len();
+
+				debug_assert!(transactions >= 2);
+				let parent = self
+					.transactions
+					.get_mut(transactions - 2)
+					.expect("`set_prev` is only `Some(_)`, if the value came from parent; qed");
+				restore_append_to_parent(
+					&mut parent.value,
+					data,
+					current_materialized,
+					parent_size,
+				);
+			}
+		}
+
+		if let Some(extrinsic) = at_extrinsic {
+			self.transaction_extrinsics_mut().insert(extrinsic);
+		}
+	}
+
+	/// Append content to a value, updating a prefixed compact encoded length.
+	///
+	/// This makes sure that the old version is not overwritten and can be properly
+	/// rolled back when required.
+	/// This avoid copying value from previous transaction.
+	fn append(
+		&mut self,
+		element: StorageValue,
+		first_write_in_tx: bool,
+		init: impl Fn() -> StorageValue,
+		at_extrinsic: Option<u32>,
+	) {
+		if self.transactions.is_empty() {
+			let mut init_value = init();
+
+			let mut append = StorageAppend::new(&mut init_value);
+
+			// Either the init value is a SCALE list like value to that the `element` gets appended
+			// or the value is reset to `[element]`.
+			let (data, current_length, materialized_length) =
+				if let Some(len) = append.extract_length() {
+					append.append_raw(element);
+
+					(init_value, len + 1, Some(len))
+				} else {
+					(element, 1, None)
+				};
+
+			self.transactions.push(InnerValue {
+				value: StorageEntry::Append {
+					data,
+					current_length,
+					materialized_length,
+					parent_size: None,
+				},
+				extrinsics: Default::default(),
+			});
+		} else if first_write_in_tx {
+			let parent = self.value_mut();
+			let (data, current_length, materialized_length, parent_size) = match parent {
+				StorageEntry::Remove => (element, 1, None, None),
+				StorageEntry::Append { data, current_length, materialized_length, .. } => {
+					let parent_len = data.len();
+					let mut data_buf = core::mem::take(data);
+					StorageAppend::new(&mut data_buf).append_raw(element);
+					(data_buf, *current_length + 1, *materialized_length, Some(parent_len))
+				},
+				StorageEntry::Set(prev) => {
+					// For compatibility: append if there is a encoded length, overwrite
+					// with value otherwhise.
+					if let Some(current_length) = StorageAppend::new(prev).extract_length() {
+						// The `prev` is cloned here, but it could be optimized to not do the clone
+						// here as it is done for `Append` above.
+						let mut data = prev.clone();
+						StorageAppend::new(&mut data).append_raw(element);
+						(data, current_length + 1, Some(current_length), None)
+					} else {
+						// overwrite, same as empty case.
+						(element, 1, None, None)
+					}
+				},
+			};
+
+			self.transactions.push(InnerValue {
+				value: StorageEntry::Append {
+					data,
+					current_length,
+					materialized_length,
+					parent_size,
+				},
+				extrinsics: Default::default(),
+			});
+		} else {
+			// not first transaction write
+			let old_value = self.value_mut();
+			let replace = match old_value {
+				StorageEntry::Remove => Some((element, 1, None)),
+				StorageEntry::Set(data) => {
+					// Note that when the data here is not initialized with append,
+					// and still starts with a valid compact u32 we can have totally broken
+					// encoding.
+					let mut append = StorageAppend::new(data);
+
+					// For compatibility: append if there is a encoded length, overwrite
+					// with value otherwhise.
+					if let Some(current_length) = append.extract_length() {
+						append.append_raw(element);
+						Some((core::mem::take(data), current_length + 1, Some(current_length)))
+					} else {
+						Some((element, 1, None))
+					}
+				},
+				StorageEntry::Append { data, current_length, .. } => {
+					StorageAppend::new(data).append_raw(element);
+					*current_length += 1;
+					None
+				},
+			};
+
+			if let Some((data, current_length, materialized_length)) = replace {
+				*old_value = StorageEntry::Append {
+					data,
+					current_length,
+					materialized_length,
+					parent_size: None,
+				};
+			}
+		}
+
+		if let Some(extrinsic) = at_extrinsic {
+			self.transaction_extrinsics_mut().insert(extrinsic);
+		}
+	}
+
 	/// The value as seen by the current transaction.
-	pub fn value(&self) -> Option<&StorageValue> {
-		self.value_ref().as_ref()
+	pub fn value(&mut self) -> Option<&StorageValue> {
+		self.value_mut().as_option()
 	}
 }
 
@@ -238,20 +542,20 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 	}
 
 	/// Get an optional reference to the value stored for the specified key.
-	pub fn get<Q>(&self, key: &Q) -> Option<&OverlayedEntry<V>>
+	pub fn get<Q>(&mut self, key: &Q) -> Option<&mut OverlayedEntry<V>>
 	where
 		K: core::borrow::Borrow<Q>,
 		Q: Ord + ?Sized,
 	{
-		self.changes.get(key)
+		self.changes.get_mut(key)
 	}
 
 	/// Set a new value for the specified key.
 	///
 	/// Can be rolled back or committed when called inside a transaction.
-	pub fn set(&mut self, key: K, value: V, at_extrinsic: Option<u32>) {
+	pub fn set_offchain(&mut self, key: K, value: V, at_extrinsic: Option<u32>) {
 		let overlayed = self.changes.entry(key.clone()).or_default();
-		overlayed.set(value, insert_dirty(&mut self.dirty_keys, key), at_extrinsic);
+		overlayed.set_offchain(value, insert_dirty(&mut self.dirty_keys, key), at_extrinsic);
 	}
 
 	/// Get a list of all changes as seen by current transaction.
@@ -259,6 +563,11 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 		self.changes.iter()
 	}
 
+	/// Get a list of all changes as seen by current transaction.
+	pub fn changes_mut(&mut self) -> impl Iterator<Item = (&K, &mut OverlayedEntry<V>)> {
+		self.changes.iter_mut()
+	}
+
 	/// Get a list of all changes as seen by current transaction, consumes
 	/// the overlay.
 	pub fn into_changes(self) -> impl Iterator<Item = (K, OverlayedEntry<V>)> {
@@ -298,7 +607,7 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 	///
 	/// This rollbacks all dangling transaction left open by the runtime.
 	/// Calling this while already outside the runtime will return an error.
-	pub fn exit_runtime(&mut self) -> Result<(), NotInRuntime> {
+	pub fn exit_runtime_offchain(&mut self) -> Result<(), NotInRuntime> {
 		if let ExecutionMode::Client = self.execution_mode {
 			return Err(NotInRuntime)
 		}
@@ -310,7 +619,7 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 			);
 		}
 		while self.has_open_runtime_transactions() {
-			self.rollback_transaction()
+			self.rollback_transaction_offchain()
 				.expect("The loop condition checks that the transaction depth is > 0; qed");
 		}
 		Ok(())
@@ -331,24 +640,24 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 	///
 	/// Any changes made during that transaction are discarded. Returns an error if
 	/// there is no open transaction that can be rolled back.
-	pub fn rollback_transaction(&mut self) -> Result<(), NoOpenTransaction> {
-		self.close_transaction(true)
+	pub fn rollback_transaction_offchain(&mut self) -> Result<(), NoOpenTransaction> {
+		self.close_transaction_offchain(true)
 	}
 
 	/// Commit the last transaction started by `start_transaction`.
 	///
 	/// Any changes made during that transaction are committed. Returns an error if
 	/// there is no open transaction that can be committed.
-	pub fn commit_transaction(&mut self) -> Result<(), NoOpenTransaction> {
-		self.close_transaction(false)
+	pub fn commit_transaction_offchain(&mut self) -> Result<(), NoOpenTransaction> {
+		self.close_transaction_offchain(false)
 	}
 
-	fn close_transaction(&mut self, rollback: bool) -> Result<(), NoOpenTransaction> {
+	fn close_transaction_offchain(&mut self, rollback: bool) -> Result<(), NoOpenTransaction> {
 		// runtime is not allowed to close transactions started by the client
-		if let ExecutionMode::Runtime = self.execution_mode {
-			if !self.has_open_runtime_transactions() {
-				return Err(NoOpenTransaction)
-			}
+		if matches!(self.execution_mode, ExecutionMode::Runtime) &&
+			!self.has_open_runtime_transactions()
+		{
+			return Err(NoOpenTransaction)
 		}
 
 		for key in self.dirty_keys.pop().ok_or(NoOpenTransaction)? {
@@ -398,32 +707,176 @@ impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
 }
 
 impl OverlayedChangeSet {
-	/// Get a mutable reference for a value.
+	/// Rollback the last transaction started by `start_transaction`.
+	///
+	/// Any changes made during that transaction are discarded. Returns an error if
+	/// there is no open transaction that can be rolled back.
+	pub fn rollback_transaction(&mut self) -> Result<(), NoOpenTransaction> {
+		self.close_transaction(true)
+	}
+
+	/// Commit the last transaction started by `start_transaction`.
+	///
+	/// Any changes made during that transaction are committed. Returns an error if
+	/// there is no open transaction that can be committed.
+	pub fn commit_transaction(&mut self) -> Result<(), NoOpenTransaction> {
+		self.close_transaction(false)
+	}
+
+	fn close_transaction(&mut self, rollback: bool) -> Result<(), NoOpenTransaction> {
+		// runtime is not allowed to close transactions started by the client
+		if matches!(self.execution_mode, ExecutionMode::Runtime) &&
+			!self.has_open_runtime_transactions()
+		{
+			return Err(NoOpenTransaction)
+		}
+
+		for key in self.dirty_keys.pop().ok_or(NoOpenTransaction)? {
+			let overlayed = self.changes.get_mut(&key).expect(
+				"\
+				A write to an OverlayedValue is recorded in the dirty key set. Before an
+				OverlayedValue is removed, its containing dirty set is removed. This
+				function is only called for keys that are in the dirty set. qed\
+			",
+			);
+
+			if rollback {
+				match overlayed.pop_transaction().value {
+					StorageEntry::Append {
+						data,
+						materialized_length,
+						parent_size: Some(parent_size),
+						..
+					} => {
+						debug_assert!(!overlayed.transactions.is_empty());
+						restore_append_to_parent(
+							overlayed.value_mut(),
+							data,
+							materialized_length,
+							parent_size,
+						);
+					},
+					_ => (),
+				}
+
+				// We need to remove the key as an `OverlayValue` with no transactions
+				// violates its invariant of always having at least one transaction.
+				if overlayed.transactions.is_empty() {
+					self.changes.remove(&key);
+				}
+			} else {
+				let has_predecessor = if let Some(dirty_keys) = self.dirty_keys.last_mut() {
+					// Not the last tx: Did the previous tx write to this key?
+					!dirty_keys.insert(key)
+				} else {
+					// Last tx: Is there already a value in the committed set?
+					// Check against one rather than empty because the current tx is still
+					// in the list as it is popped later in this function.
+					overlayed.transactions.len() > 1
+				};
+
+				// We only need to merge if there is an pre-existing value. It may be a value from
+				// the previous transaction or a value committed without any open transaction.
+				if has_predecessor {
+					let mut committed_tx = overlayed.pop_transaction();
+					let mut merge_appends = false;
+
+					// consecutive appends need to keep past `parent_size` value.
+					if let StorageEntry::Append { parent_size, .. } = &mut committed_tx.value {
+						if parent_size.is_some() {
+							let parent = overlayed.value_mut();
+							if let StorageEntry::Append { parent_size: keep_me, .. } = parent {
+								merge_appends = true;
+								*parent_size = *keep_me;
+							}
+						}
+					}
+
+					if merge_appends {
+						*overlayed.value_mut() = committed_tx.value;
+					} else {
+						let removed = core::mem::replace(overlayed.value_mut(), committed_tx.value);
+						// The transaction being commited is not an append operation. However, the
+						// value being overwritten in the previous transaction might be an append
+						// that needs to be merged with its parent. We only need to handle `Append`
+						// here because `Set` and `Remove` can directly overwrite previous
+						// operations.
+						if let StorageEntry::Append {
+							parent_size, data, materialized_length, ..
+						} = removed
+						{
+							if let Some(parent_size) = parent_size {
+								let transactions = overlayed.transactions.len();
+
+								// info from replaced head so len is at least one
+								// and parent_size implies a parent transaction
+								// so length is at least two.
+								debug_assert!(transactions >= 2);
+								if let Some(parent) =
+									overlayed.transactions.get_mut(transactions - 2)
+								{
+									restore_append_to_parent(
+										&mut parent.value,
+										data,
+										materialized_length,
+										parent_size,
+									)
+								}
+							}
+						}
+					}
+
+					overlayed.transaction_extrinsics_mut().extend(committed_tx.extrinsics);
+				}
+			}
+		}
+
+		Ok(())
+	}
+
+	/// Call this when control returns from the runtime.
+	///
+	/// This commits all dangling transaction left open by the runtime.
+	/// Calling this while already outside the runtime will return an error.
+	pub fn exit_runtime(&mut self) -> Result<(), NotInRuntime> {
+		if matches!(self.execution_mode, ExecutionMode::Client) {
+			return Err(NotInRuntime)
+		}
+
+		self.execution_mode = ExecutionMode::Client;
+		if self.has_open_runtime_transactions() {
+			warn!(
+				"{} storage transactions are left open by the runtime. Those will be rolled back.",
+				self.transaction_depth() - self.num_client_transactions,
+			);
+		}
+		while self.has_open_runtime_transactions() {
+			self.rollback_transaction()
+				.expect("The loop condition checks that the transaction depth is > 0; qed");
+		}
+
+		Ok(())
+	}
+
+	/// Set a new value for the specified key.
 	///
 	/// Can be rolled back or committed when called inside a transaction.
-	#[must_use = "A change was registered, so this value MUST be modified."]
-	pub fn modify(
+	pub fn set(&mut self, key: StorageKey, value: Option<StorageValue>, at_extrinsic: Option<u32>) {
+		let overlayed = self.changes.entry(key.clone()).or_default();
+		overlayed.set(value, insert_dirty(&mut self.dirty_keys, key), at_extrinsic);
+	}
+
+	/// Append bytes to an existing content.
+	pub fn append_storage(
 		&mut self,
 		key: StorageKey,
+		value: StorageValue,
 		init: impl Fn() -> StorageValue,
 		at_extrinsic: Option<u32>,
-	) -> &mut Option<StorageValue> {
+	) {
 		let overlayed = self.changes.entry(key.clone()).or_default();
 		let first_write_in_tx = insert_dirty(&mut self.dirty_keys, key);
-		let clone_into_new_tx = if let Some(tx) = overlayed.transactions.last() {
-			if first_write_in_tx {
-				Some(tx.value.clone())
-			} else {
-				None
-			}
-		} else {
-			Some(Some(init()))
-		};
-
-		if let Some(cloned) = clone_into_new_tx {
-			overlayed.set(cloned, first_write_in_tx, at_extrinsic);
-		}
-		overlayed.value_mut()
+		overlayed.append(value, first_write_in_tx, init, at_extrinsic);
 	}
 
 	/// Set all values to deleted which are matched by the predicate.
@@ -436,7 +889,7 @@ impl OverlayedChangeSet {
 	) -> u32 {
 		let mut count = 0;
 		for (key, val) in self.changes.iter_mut().filter(|(k, v)| predicate(k, v)) {
-			if val.value_ref().is_some() {
+			if matches!(val.value_ref(), StorageEntry::Set(..) | StorageEntry::Append { .. }) {
 				count += 1;
 			}
 			val.set(None, insert_dirty(&mut self.dirty_keys, key.clone()), at_extrinsic);
@@ -445,10 +898,13 @@ impl OverlayedChangeSet {
 	}
 
 	/// Get the iterator over all changes that follow the supplied `key`.
-	pub fn changes_after(&self, key: &[u8]) -> impl Iterator<Item = (&[u8], &OverlayedValue)> {
+	pub fn changes_after(
+		&mut self,
+		key: &[u8],
+	) -> impl Iterator<Item = (&[u8], &mut OverlayedValue)> {
 		use core::ops::Bound;
 		let range = (Bound::Excluded(key), Bound::Unbounded);
-		self.changes.range::<[u8], _>(range).map(|(k, v)| (k.as_slice(), v))
+		self.changes.range_mut::<[u8], _>(range).map(|(k, v)| (k.as_slice(), v))
 	}
 }
 
@@ -460,18 +916,19 @@ mod test {
 	type Changes<'a> = Vec<(&'a [u8], (Option<&'a [u8]>, Vec<u32>))>;
 	type Drained<'a> = Vec<(&'a [u8], Option<&'a [u8]>)>;
 
-	fn assert_changes(is: &OverlayedChangeSet, expected: &Changes) {
+	fn assert_changes(is: &mut OverlayedChangeSet, expected: &Changes) {
 		let is: Changes = is
-			.changes()
+			.changes_mut()
 			.map(|(k, v)| {
-				(k.as_ref(), (v.value().map(AsRef::as_ref), v.extrinsics().into_iter().collect()))
+				let extrinsics = v.extrinsics().into_iter().collect();
+				(k.as_ref(), (v.value().map(AsRef::as_ref), extrinsics))
 			})
 			.collect();
 		assert_eq!(&is, expected);
 	}
 
 	fn assert_drained_changes(is: OverlayedChangeSet, expected: Changes) {
-		let is = is.drain_committed().collect::<Vec<_>>();
+		let is = is.drain_committed().map(|(k, v)| (k, v.to_option())).collect::<Vec<_>>();
 		let expected = expected
 			.iter()
 			.map(|(k, v)| (k.to_vec(), v.0.map(From::from)))
@@ -480,7 +937,7 @@ mod test {
 	}
 
 	fn assert_drained(is: OverlayedChangeSet, expected: Drained) {
-		let is = is.drain_committed().collect::<Vec<_>>();
+		let is = is.drain_committed().map(|(k, v)| (k, v.to_option())).collect::<Vec<_>>();
 		let expected = expected
 			.iter()
 			.map(|(k, v)| (k.to_vec(), v.map(From::from)))
@@ -535,7 +992,7 @@ mod test {
 			(b"key7", (Some(b"val7-rolled"), vec![77])),
 			(b"key99", (Some(b"val99"), vec![99])),
 		];
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		// this should be no-op
 		changeset.start_transaction();
@@ -546,7 +1003,7 @@ mod test {
 		assert_eq!(changeset.transaction_depth(), 3);
 		changeset.commit_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 2);
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		// roll back our first transactions that actually contains something
 		changeset.rollback_transaction().unwrap();
@@ -558,11 +1015,11 @@ mod test {
 			(b"key42", (Some(b"val42"), vec![42])),
 			(b"key99", (Some(b"val99"), vec![99])),
 		];
-		assert_changes(&changeset, &rolled_back);
+		assert_changes(&mut changeset, &rolled_back);
 
 		changeset.commit_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 0);
-		assert_changes(&changeset, &rolled_back);
+		assert_changes(&mut changeset, &rolled_back);
 
 		assert_drained_changes(changeset, rolled_back);
 	}
@@ -598,7 +1055,7 @@ mod test {
 			(b"key7", (Some(b"val7-rolled"), vec![77])),
 			(b"key99", (Some(b"val99"), vec![99])),
 		];
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		// this should be no-op
 		changeset.start_transaction();
@@ -609,35 +1066,46 @@ mod test {
 		assert_eq!(changeset.transaction_depth(), 3);
 		changeset.commit_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 2);
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		changeset.commit_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 1);
 
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		changeset.rollback_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 0);
 
 		let rolled_back: Changes =
 			vec![(b"key0", (Some(b"val0-1"), vec![1, 10])), (b"key1", (Some(b"val1"), vec![1]))];
-		assert_changes(&changeset, &rolled_back);
+		assert_changes(&mut changeset, &rolled_back);
 
 		assert_drained_changes(changeset, rolled_back);
 	}
 
 	#[test]
-	fn modify_works() {
+	fn append_works() {
+		use codec::Encode;
 		let mut changeset = OverlayedChangeSet::default();
 		assert_eq!(changeset.transaction_depth(), 0);
-		let init = || b"valinit".to_vec();
+		let init = || vec![b"valinit".to_vec()].encode();
 
 		// committed set
-		changeset.set(b"key0".to_vec(), Some(b"val0".to_vec()), Some(0));
+		let val0 = vec![b"val0".to_vec()].encode();
+		changeset.set(b"key0".to_vec(), Some(val0.clone()), Some(0));
 		changeset.set(b"key1".to_vec(), None, Some(1));
-		let val = changeset.modify(b"key3".to_vec(), init, Some(3));
-		assert_eq!(val, &Some(b"valinit".to_vec()));
-		val.as_mut().unwrap().extend_from_slice(b"-modified");
+		let all_changes: Changes =
+			vec![(b"key0", (Some(val0.as_slice()), vec![0])), (b"key1", (None, vec![1]))];
+
+		assert_changes(&mut changeset, &all_changes);
+		changeset.append_storage(b"key3".to_vec(), b"-modified".to_vec().encode(), init, Some(3));
+		let val3 = vec![b"valinit".to_vec(), b"-modified".to_vec()].encode();
+		let all_changes: Changes = vec![
+			(b"key0", (Some(val0.as_slice()), vec![0])),
+			(b"key1", (None, vec![1])),
+			(b"key3", (Some(val3.as_slice()), vec![3])),
+		];
+		assert_changes(&mut changeset, &all_changes);
 
 		changeset.start_transaction();
 		assert_eq!(changeset.transaction_depth(), 1);
@@ -645,39 +1113,75 @@ mod test {
 		assert_eq!(changeset.transaction_depth(), 2);
 
 		// non existing value -> init value should be returned
-		let val = changeset.modify(b"key2".to_vec(), init, Some(2));
-		assert_eq!(val, &Some(b"valinit".to_vec()));
-		val.as_mut().unwrap().extend_from_slice(b"-modified");
+		changeset.append_storage(b"key3".to_vec(), b"-twice".to_vec().encode(), init, Some(15));
 
-		// existing value should be returned by modify
-		let val = changeset.modify(b"key0".to_vec(), init, Some(10));
-		assert_eq!(val, &Some(b"val0".to_vec()));
-		val.as_mut().unwrap().extend_from_slice(b"-modified");
+		// non existing value -> init value should be returned
+		changeset.append_storage(b"key2".to_vec(), b"-modified".to_vec().encode(), init, Some(2));
+		// existing value should be reuse on append
+		changeset.append_storage(b"key0".to_vec(), b"-modified".to_vec().encode(), init, Some(10));
 
 		// should work for deleted keys
-		let val = changeset.modify(b"key1".to_vec(), init, Some(20));
-		assert_eq!(val, &None);
-		*val = Some(b"deleted-modified".to_vec());
+		changeset.append_storage(
+			b"key1".to_vec(),
+			b"deleted-modified".to_vec().encode(),
+			init,
+			Some(20),
+		);
+		let val0_2 = vec![b"val0".to_vec(), b"-modified".to_vec()].encode();
+		let val3_2 = vec![b"valinit".to_vec(), b"-modified".to_vec(), b"-twice".to_vec()].encode();
+		let val1 = vec![b"deleted-modified".to_vec()].encode();
+		let all_changes: Changes = vec![
+			(b"key0", (Some(val0_2.as_slice()), vec![0, 10])),
+			(b"key1", (Some(val1.as_slice()), vec![1, 20])),
+			(b"key2", (Some(val3.as_slice()), vec![2])),
+			(b"key3", (Some(val3_2.as_slice()), vec![3, 15])),
+		];
+		assert_changes(&mut changeset, &all_changes);
+
+		changeset.start_transaction();
+		let val3_3 =
+			vec![b"valinit".to_vec(), b"-modified".to_vec(), b"-twice".to_vec(), b"-2".to_vec()]
+				.encode();
+		changeset.append_storage(b"key3".to_vec(), b"-2".to_vec().encode(), init, Some(21));
+		let all_changes2: Changes = vec![
+			(b"key0", (Some(val0_2.as_slice()), vec![0, 10])),
+			(b"key1", (Some(val1.as_slice()), vec![1, 20])),
+			(b"key2", (Some(val3.as_slice()), vec![2])),
+			(b"key3", (Some(val3_3.as_slice()), vec![3, 15, 21])),
+		];
+		assert_changes(&mut changeset, &all_changes2);
+		changeset.rollback_transaction().unwrap();
 
+		assert_changes(&mut changeset, &all_changes);
+		changeset.start_transaction();
+		let val3_4 = vec![
+			b"valinit".to_vec(),
+			b"-modified".to_vec(),
+			b"-twice".to_vec(),
+			b"-thrice".to_vec(),
+		]
+		.encode();
+		changeset.append_storage(b"key3".to_vec(), b"-thrice".to_vec().encode(), init, Some(25));
 		let all_changes: Changes = vec![
-			(b"key0", (Some(b"val0-modified"), vec![0, 10])),
-			(b"key1", (Some(b"deleted-modified"), vec![1, 20])),
-			(b"key2", (Some(b"valinit-modified"), vec![2])),
-			(b"key3", (Some(b"valinit-modified"), vec![3])),
+			(b"key0", (Some(val0_2.as_slice()), vec![0, 10])),
+			(b"key1", (Some(val1.as_slice()), vec![1, 20])),
+			(b"key2", (Some(val3.as_slice()), vec![2])),
+			(b"key3", (Some(val3_4.as_slice()), vec![3, 15, 25])),
 		];
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
+		changeset.commit_transaction().unwrap();
 		changeset.commit_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 1);
-		assert_changes(&changeset, &all_changes);
+		assert_changes(&mut changeset, &all_changes);
 
 		changeset.rollback_transaction().unwrap();
 		assert_eq!(changeset.transaction_depth(), 0);
 		let rolled_back: Changes = vec![
-			(b"key0", (Some(b"val0"), vec![0])),
+			(b"key0", (Some(val0.as_slice()), vec![0])),
 			(b"key1", (None, vec![1])),
-			(b"key3", (Some(b"valinit-modified"), vec![3])),
+			(b"key3", (Some(val3.as_slice()), vec![3])),
 		];
-		assert_changes(&changeset, &rolled_back);
+		assert_changes(&mut changeset, &rolled_back);
 		assert_drained_changes(changeset, rolled_back);
 	}
 
@@ -695,7 +1199,7 @@ mod test {
 		changeset.clear_where(|k, _| k.starts_with(b"del"), Some(5));
 
 		assert_changes(
-			&changeset,
+			&mut changeset,
 			&vec![
 				(b"del1", (None, vec![3, 5])),
 				(b"del2", (None, vec![4, 5])),
@@ -707,7 +1211,7 @@ mod test {
 		changeset.rollback_transaction().unwrap();
 
 		assert_changes(
-			&changeset,
+			&mut changeset,
 			&vec![
 				(b"del1", (Some(b"delval1"), vec![3])),
 				(b"del2", (Some(b"delval2"), vec![4])),
@@ -850,4 +1354,72 @@ mod test {
 		assert_eq!(changeset.exit_runtime(), Ok(()));
 		assert_eq!(changeset.exit_runtime(), Err(NotInRuntime));
 	}
+
+	#[test]
+	fn restore_append_to_parent() {
+		use codec::{Compact, Encode};
+		let mut changeset = OverlayedChangeSet::default();
+		let key: Vec<u8> = b"akey".into();
+
+		let from = 50; // 1 byte len
+		let to = 100; // 2 byte len
+		for i in 0..from {
+			changeset.append_storage(key.clone(), vec![i], Default::default, None);
+		}
+
+		// materialized
+		let encoded = changeset.get(&key).unwrap().value().unwrap();
+		let encoded_from_len = Compact(from as u32).encode();
+		assert_eq!(encoded_from_len.len(), 1);
+		assert!(encoded.starts_with(&encoded_from_len[..]));
+		let encoded_from = encoded.clone();
+
+		changeset.start_transaction();
+
+		for i in from..to {
+			changeset.append_storage(key.clone(), vec![i], Default::default, None);
+		}
+
+		// materialized
+		let encoded = changeset.get(&key).unwrap().value().unwrap();
+		let encoded_to_len = Compact(to as u32).encode();
+		assert_eq!(encoded_to_len.len(), 2);
+		assert!(encoded.starts_with(&encoded_to_len[..]));
+
+		changeset.rollback_transaction().unwrap();
+
+		let encoded = changeset.get(&key).unwrap().value().unwrap();
+		assert_eq!(&encoded_from, encoded);
+	}
+
+	/// First we have some `Set` operation with a valid SCALE list. Then we append data and rollback
+	/// afterwards.
+	#[test]
+	fn restore_initial_set_after_append_to_parent() {
+		use codec::{Compact, Encode};
+		let mut changeset = OverlayedChangeSet::default();
+		let key: Vec<u8> = b"akey".into();
+
+		let initial_data = vec![1u8; 50].encode();
+
+		changeset.set(key.clone(), Some(initial_data.clone()), None);
+
+		changeset.start_transaction();
+
+		// Append until we require 2 bytes for the length prefix.
+		for i in 0..50 {
+			changeset.append_storage(key.clone(), vec![i], Default::default, None);
+		}
+
+		// Materialize the value.
+		let encoded = changeset.get(&key).unwrap().value().unwrap();
+		let encoded_to_len = Compact(100u32).encode();
+		assert_eq!(encoded_to_len.len(), 2);
+		assert!(encoded.starts_with(&encoded_to_len[..]));
+
+		changeset.rollback_transaction().unwrap();
+
+		let encoded = changeset.get(&key).unwrap().value().unwrap();
+		assert_eq!(&initial_data, encoded);
+	}
 }
diff --git a/substrate/primitives/state-machine/src/overlayed_changes/mod.rs b/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
index d6fc404e84fb5256656c5849d8f52894f37bfa02..c2dc637bc71a70dbef3a55abf658dbd1dd889db5 100644
--- a/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
+++ b/substrate/primitives/state-machine/src/overlayed_changes/mod.rs
@@ -289,7 +289,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 	/// Returns a double-Option: None if the key is unknown (i.e. and the query should be referred
 	/// to the backend); Some(None) if the key has been deleted. Some(Some(...)) for a key whose
 	/// value has been set.
-	pub fn storage(&self, key: &[u8]) -> Option<Option<&[u8]>> {
+	pub fn storage(&mut self, key: &[u8]) -> Option<Option<&[u8]>> {
 		self.top.get(key).map(|x| {
 			let value = x.value();
 			let size_read = value.map(|x| x.len() as u64).unwrap_or(0);
@@ -304,30 +304,11 @@ impl<H: Hasher> OverlayedChanges<H> {
 		self.storage_transaction_cache = None;
 	}
 
-	/// Returns mutable reference to current value.
-	/// If there is no value in the overlay, the given callback is used to initiate the value.
-	/// Warning this function registers a change, so the mutable reference MUST be modified.
-	///
-	/// Can be rolled back or committed when called inside a transaction.
-	#[must_use = "A change was registered, so this value MUST be modified."]
-	pub fn value_mut_or_insert_with(
-		&mut self,
-		key: &[u8],
-		init: impl Fn() -> StorageValue,
-	) -> &mut StorageValue {
-		self.mark_dirty();
-
-		let value = self.top.modify(key.to_vec(), init, self.extrinsic_index());
-
-		// if the value was deleted initialise it back with an empty vec
-		value.get_or_insert_with(StorageValue::default)
-	}
-
 	/// Returns a double-Option: None if the key is unknown (i.e. and the query should be referred
 	/// to the backend); Some(None) if the key has been deleted. Some(Some(...)) for a key whose
 	/// value has been set.
-	pub fn child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Option<&[u8]>> {
-		let map = self.children.get(child_info.storage_key())?;
+	pub fn child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Option<&[u8]>> {
+		let map = self.children.get_mut(child_info.storage_key())?;
 		let value = map.0.get(key)?.value();
 		let size_read = value.map(|x| x.len() as u64).unwrap_or(0);
 		self.stats.tally_read_modified(size_read);
@@ -342,7 +323,21 @@ impl<H: Hasher> OverlayedChanges<H> {
 
 		let size_write = val.as_ref().map(|x| x.len() as u64).unwrap_or(0);
 		self.stats.tally_write_overlay(size_write);
-		self.top.set(key, val, self.extrinsic_index());
+		let extrinsic_index = self.extrinsic_index();
+		self.top.set(key, val, extrinsic_index);
+	}
+
+	/// Append a element to storage, init with existing value if first write.
+	pub fn append_storage(
+		&mut self,
+		key: StorageKey,
+		element: StorageValue,
+		init: impl Fn() -> StorageValue,
+	) {
+		let extrinsic_index = self.extrinsic_index();
+		let size_write = element.len() as u64;
+		self.stats.tally_write_overlay(size_write);
+		self.top.append_storage(key, element, init, extrinsic_index);
 	}
 
 	/// Set a new value for the specified key and child.
@@ -396,7 +391,8 @@ impl<H: Hasher> OverlayedChanges<H> {
 	pub fn clear_prefix(&mut self, prefix: &[u8]) -> u32 {
 		self.mark_dirty();
 
-		self.top.clear_where(|key, _| key.starts_with(prefix), self.extrinsic_index())
+		let extrinsic_index = self.extrinsic_index();
+		self.top.clear_where(|key, _| key.starts_with(prefix), extrinsic_index)
 	}
 
 	/// Removes all key-value pairs which keys share the given prefix.
@@ -457,7 +453,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 		});
 		self.offchain
 			.overlay_mut()
-			.rollback_transaction()
+			.rollback_transaction_offchain()
 			.expect("Top and offchain changesets are started in lockstep; qed");
 		Ok(())
 	}
@@ -475,7 +471,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 		}
 		self.offchain
 			.overlay_mut()
-			.commit_transaction()
+			.commit_transaction_offchain()
 			.expect("Top and offchain changesets are started in lockstep; qed");
 		Ok(())
 	}
@@ -511,7 +507,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 		}
 		self.offchain
 			.overlay_mut()
-			.exit_runtime()
+			.exit_runtime_offchain()
 			.expect("Top and offchain changesets are started in lockstep; qed");
 		Ok(())
 	}
@@ -535,11 +531,24 @@ impl<H: Hasher> OverlayedChanges<H> {
 		self.children.values().map(|v| (v.0.changes(), &v.1))
 	}
 
+	/// Get an iterator over all child changes as seen by the current transaction.
+	pub fn children_mut(
+		&mut self,
+	) -> impl Iterator<Item = (impl Iterator<Item = (&StorageKey, &mut OverlayedValue)>, &ChildInfo)>
+	{
+		self.children.values_mut().map(|v| (v.0.changes_mut(), &v.1))
+	}
+
 	/// Get an iterator over all top changes as been by the current transaction.
 	pub fn changes(&self) -> impl Iterator<Item = (&StorageKey, &OverlayedValue)> {
 		self.top.changes()
 	}
 
+	/// Get an iterator over all top changes as been by the current transaction.
+	pub fn changes_mut(&mut self) -> impl Iterator<Item = (&StorageKey, &mut OverlayedValue)> {
+		self.top.changes_mut()
+	}
+
 	/// Get an optional iterator over all child changes stored under the supplied key.
 	pub fn child_changes(
 		&self,
@@ -548,6 +557,16 @@ impl<H: Hasher> OverlayedChanges<H> {
 		self.children.get(key).map(|(overlay, info)| (overlay.changes(), info))
 	}
 
+	/// Get an optional iterator over all child changes stored under the supplied key.
+	pub fn child_changes_mut(
+		&mut self,
+		key: &[u8],
+	) -> Option<(impl Iterator<Item = (&StorageKey, &mut OverlayedValue)>, &ChildInfo)> {
+		self.children
+			.get_mut(key)
+			.map(|(overlay, info)| (overlay.changes_mut(), &*info))
+	}
+
 	/// Get an list of all index operations.
 	pub fn transaction_index_ops(&self) -> &[IndexOperation] {
 		&self.transaction_index_ops
@@ -575,11 +594,12 @@ impl<H: Hasher> OverlayedChanges<H> {
 		};
 
 		use core::mem::take;
-		let main_storage_changes = take(&mut self.top).drain_committed();
-		let child_storage_changes = take(&mut self.children)
-			.into_iter()
-			.map(|(key, (val, info))| (key, (val.drain_committed(), info)));
-
+		let main_storage_changes =
+			take(&mut self.top).drain_committed().map(|(k, v)| (k, v.to_option()));
+		let child_storage_changes =
+			take(&mut self.children).into_iter().map(|(key, (val, info))| {
+				(key, (val.drain_committed().map(|(k, v)| (k, v.to_option())), info))
+			});
 		let offchain_storage_changes = self.offchain_drain_committed().collect();
 
 		#[cfg(feature = "std")]
@@ -610,7 +630,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 	/// set this index before first and unset after last extrinsic is executed.
 	/// Changes that are made outside of extrinsics, are marked with
 	/// `NO_EXTRINSIC_INDEX` index.
-	fn extrinsic_index(&self) -> Option<u32> {
+	fn extrinsic_index(&mut self) -> Option<u32> {
 		self.collect_extrinsics.then(|| {
 			self.storage(EXTRINSIC_INDEX)
 				.and_then(|idx| idx.and_then(|idx| Decode::decode(&mut &*idx).ok()))
@@ -634,10 +654,12 @@ impl<H: Hasher> OverlayedChanges<H> {
 			return (cache.transaction_storage_root, true)
 		}
 
-		let delta = self.changes().map(|(k, v)| (&k[..], v.value().map(|v| &v[..])));
-		let child_delta = self.children().map(|(changes, info)| {
-			(info, changes.map(|(k, v)| (&k[..], v.value().map(|v| &v[..]))))
-		});
+		let delta = self.top.changes_mut().map(|(k, v)| (&k[..], v.value().map(|v| &v[..])));
+
+		let child_delta = self
+			.children
+			.values_mut()
+			.map(|v| (&v.1, v.0.changes_mut().map(|(k, v)| (&k[..], v.value().map(|v| &v[..])))));
 
 		let (root, transaction) = backend.full_storage_root(delta, child_delta, state_version);
 
@@ -677,7 +699,7 @@ impl<H: Hasher> OverlayedChanges<H> {
 			return Ok((root, true))
 		}
 
-		let root = if let Some((changes, info)) = self.child_changes(storage_key) {
+		let root = if let Some((changes, info)) = self.child_changes_mut(storage_key) {
 			let delta = changes.map(|(k, v)| (k.as_ref(), v.value().map(AsRef::as_ref)));
 			Some(backend.child_storage_root(info, delta, state_version))
 		} else {
@@ -711,19 +733,19 @@ impl<H: Hasher> OverlayedChanges<H> {
 
 	/// Returns an iterator over the keys (in lexicographic order) following `key` (excluding `key`)
 	/// alongside its value.
-	pub fn iter_after(&self, key: &[u8]) -> impl Iterator<Item = (&[u8], &OverlayedValue)> {
+	pub fn iter_after(&mut self, key: &[u8]) -> impl Iterator<Item = (&[u8], &mut OverlayedValue)> {
 		self.top.changes_after(key)
 	}
 
 	/// Returns an iterator over the keys (in lexicographic order) following `key` (excluding `key`)
 	/// alongside its value for the given `storage_key` child.
 	pub fn child_iter_after(
-		&self,
+		&mut self,
 		storage_key: &[u8],
 		key: &[u8],
-	) -> impl Iterator<Item = (&[u8], &OverlayedValue)> {
+	) -> impl Iterator<Item = (&[u8], &mut OverlayedValue)> {
 		self.children
-			.get(storage_key)
+			.get_mut(storage_key)
 			.map(|(overlay, _)| overlay.changes_after(key))
 			.into_iter()
 			.flatten()
@@ -858,7 +880,11 @@ mod tests {
 	use sp_core::{traits::Externalities, Blake2Hasher};
 	use std::collections::BTreeMap;
 
-	fn assert_extrinsics(overlay: &OverlayedChangeSet, key: impl AsRef<[u8]>, expected: Vec<u32>) {
+	fn assert_extrinsics(
+		overlay: &mut OverlayedChangeSet,
+		key: impl AsRef<[u8]>,
+		expected: Vec<u32>,
+	) {
 		assert_eq!(
 			overlay.get(key.as_ref()).unwrap().extrinsics().into_iter().collect::<Vec<_>>(),
 			expected
@@ -1049,9 +1075,9 @@ mod tests {
 		overlay.set_extrinsic_index(2);
 		overlay.set_storage(vec![1], Some(vec![6]));
 
-		assert_extrinsics(&overlay.top, vec![1], vec![0, 2]);
-		assert_extrinsics(&overlay.top, vec![3], vec![1]);
-		assert_extrinsics(&overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
+		assert_extrinsics(&mut overlay.top, vec![1], vec![0, 2]);
+		assert_extrinsics(&mut overlay.top, vec![3], vec![1]);
+		assert_extrinsics(&mut overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
 
 		overlay.start_transaction();
 
@@ -1061,15 +1087,15 @@ mod tests {
 		overlay.set_extrinsic_index(4);
 		overlay.set_storage(vec![1], Some(vec![8]));
 
-		assert_extrinsics(&overlay.top, vec![1], vec![0, 2, 4]);
-		assert_extrinsics(&overlay.top, vec![3], vec![1, 3]);
-		assert_extrinsics(&overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
+		assert_extrinsics(&mut overlay.top, vec![1], vec![0, 2, 4]);
+		assert_extrinsics(&mut overlay.top, vec![3], vec![1, 3]);
+		assert_extrinsics(&mut overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
 
 		overlay.rollback_transaction().unwrap();
 
-		assert_extrinsics(&overlay.top, vec![1], vec![0, 2]);
-		assert_extrinsics(&overlay.top, vec![3], vec![1]);
-		assert_extrinsics(&overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
+		assert_extrinsics(&mut overlay.top, vec![1], vec![0, 2]);
+		assert_extrinsics(&mut overlay.top, vec![3], vec![1]);
+		assert_extrinsics(&mut overlay.top, vec![100], vec![NO_EXTRINSIC_INDEX]);
 	}
 
 	#[test]
diff --git a/substrate/primitives/state-machine/src/overlayed_changes/offchain.rs b/substrate/primitives/state-machine/src/overlayed_changes/offchain.rs
index 1e6965e874759e30b415783a560192af8412027d..517a51b02693c0528cf9475972a259ab72fd5f59 100644
--- a/substrate/primitives/state-machine/src/overlayed_changes/offchain.rs
+++ b/substrate/primitives/state-machine/src/overlayed_changes/offchain.rs
@@ -42,7 +42,7 @@ impl OffchainOverlayedChanges {
 	}
 
 	/// Iterate over all key value pairs by reference.
-	pub fn iter(&self) -> impl Iterator<Item = OffchainOverlayedChangesItem> {
+	pub fn iter(&mut self) -> impl Iterator<Item = OffchainOverlayedChangesItem> {
 		self.0.changes().map(|kv| (kv.0, kv.1.value_ref()))
 	}
 
@@ -53,14 +53,16 @@ impl OffchainOverlayedChanges {
 
 	/// Remove a key and its associated value from the offchain database.
 	pub fn remove(&mut self, prefix: &[u8], key: &[u8]) {
-		let _ = self
-			.0
-			.set((prefix.to_vec(), key.to_vec()), OffchainOverlayedChange::Remove, None);
+		let _ = self.0.set_offchain(
+			(prefix.to_vec(), key.to_vec()),
+			OffchainOverlayedChange::Remove,
+			None,
+		);
 	}
 
 	/// Set the value associated with a key under a prefix to the value provided.
 	pub fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) {
-		let _ = self.0.set(
+		let _ = self.0.set_offchain(
 			(prefix.to_vec(), key.to_vec()),
 			OffchainOverlayedChange::SetValue(value.to_vec()),
 			None,
@@ -68,7 +70,7 @@ impl OffchainOverlayedChanges {
 	}
 
 	/// Obtain a associated value to the given key in storage with prefix.
-	pub fn get(&self, prefix: &[u8], key: &[u8]) -> Option<OffchainOverlayedChange> {
+	pub fn get(&mut self, prefix: &[u8], key: &[u8]) -> Option<OffchainOverlayedChange> {
 		let key = (prefix.to_vec(), key.to_vec());
 		self.0.get(&key).map(|entry| entry.value_ref()).cloned()
 	}
diff --git a/substrate/primitives/state-machine/src/read_only.rs b/substrate/primitives/state-machine/src/read_only.rs
index 2056bf9866358d4d5086fbdf67a7385e7532092f..b78d17138b0ff5c968a858ab7515b5d82a9673c3 100644
--- a/substrate/primitives/state-machine/src/read_only.rs
+++ b/substrate/primitives/state-machine/src/read_only.rs
@@ -88,39 +88,39 @@ where
 		panic!("Should not be used in read-only externalities!")
 	}
 
-	fn storage(&self, key: &[u8]) -> Option<StorageValue> {
+	fn storage(&mut self, key: &[u8]) -> Option<StorageValue> {
 		self.backend
 			.storage(key)
 			.expect("Backed failed for storage in ReadOnlyExternalities")
 	}
 
-	fn storage_hash(&self, key: &[u8]) -> Option<Vec<u8>> {
+	fn storage_hash(&mut self, key: &[u8]) -> Option<Vec<u8>> {
 		self.backend
 			.storage_hash(key)
 			.expect("Backed failed for storage_hash in ReadOnlyExternalities")
 			.map(|h| h.encode())
 	}
 
-	fn child_storage(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
+	fn child_storage(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageValue> {
 		self.backend
 			.child_storage(child_info, key)
 			.expect("Backed failed for child_storage in ReadOnlyExternalities")
 	}
 
-	fn child_storage_hash(&self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
+	fn child_storage_hash(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<Vec<u8>> {
 		self.backend
 			.child_storage_hash(child_info, key)
 			.expect("Backed failed for child_storage_hash in ReadOnlyExternalities")
 			.map(|h| h.encode())
 	}
 
-	fn next_storage_key(&self, key: &[u8]) -> Option<StorageKey> {
+	fn next_storage_key(&mut self, key: &[u8]) -> Option<StorageKey> {
 		self.backend
 			.next_storage_key(key)
 			.expect("Backed failed for next_storage_key in ReadOnlyExternalities")
 	}
 
-	fn next_child_storage_key(&self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
+	fn next_child_storage_key(&mut self, child_info: &ChildInfo, key: &[u8]) -> Option<StorageKey> {
 		self.backend
 			.next_child_storage_key(child_info, key)
 			.expect("Backed failed for next_child_storage_key in ReadOnlyExternalities")
diff --git a/substrate/primitives/state-machine/src/testing.rs b/substrate/primitives/state-machine/src/testing.rs
index e19ba95755c1b16f5cc6a2d59a8ede48997707f0..e9d64a891e819094d8a2c4d49265d01d6b7fc965 100644
--- a/substrate/primitives/state-machine/src/testing.rs
+++ b/substrate/primitives/state-machine/src/testing.rs
@@ -209,12 +209,15 @@ where
 	///
 	/// In contrast to [`commit_all`](Self::commit_all) this will not panic if there are open
 	/// transactions.
-	pub fn as_backend(&self) -> InMemoryBackend<H> {
-		let top: Vec<_> =
-			self.overlay.changes().map(|(k, v)| (k.clone(), v.value().cloned())).collect();
+	pub fn as_backend(&mut self) -> InMemoryBackend<H> {
+		let top: Vec<_> = self
+			.overlay
+			.changes_mut()
+			.map(|(k, v)| (k.clone(), v.value().cloned()))
+			.collect();
 		let mut transaction = vec![(None, top)];
 
-		for (child_changes, child_info) in self.overlay.children() {
+		for (child_changes, child_info) in self.overlay.children_mut() {
 			transaction.push((
 				Some(child_info.clone()),
 				child_changes.map(|(k, v)| (k.clone(), v.value().cloned())).collect(),
@@ -293,13 +296,14 @@ where
 	}
 }
 
-impl<H: Hasher> PartialEq for TestExternalities<H>
+impl<H> TestExternalities<H>
 where
+	H: Hasher,
 	H::Out: Ord + 'static + codec::Codec,
 {
 	/// This doesn't test if they are in the same state, only if they contains the
 	/// same data at this state
-	fn eq(&self, other: &TestExternalities<H>) -> bool {
+	pub fn eq(&mut self, other: &mut TestExternalities<H>) -> bool {
 		self.as_backend().eq(&other.as_backend())
 	}
 }
diff --git a/substrate/utils/frame/remote-externalities/src/lib.rs b/substrate/utils/frame/remote-externalities/src/lib.rs
index 0ecb98f31343aa050d9daa8f33445e858631813d..44e5f467d895e34eca95d2b6ef86186b03d2f6b2 100644
--- a/substrate/utils/frame/remote-externalities/src/lib.rs
+++ b/substrate/utils/frame/remote-externalities/src/lib.rs
@@ -1383,7 +1383,7 @@ mod remote_tests {
 		init_logger();
 
 		// create an ext with children keys
-		let child_ext = Builder::<Block>::new()
+		let mut child_ext = Builder::<Block>::new()
 			.mode(Mode::Online(OnlineConfig {
 				transport: endpoint().clone().into(),
 				pallets: vec!["Proxy".to_owned()],
@@ -1396,7 +1396,7 @@ mod remote_tests {
 			.unwrap();
 
 		// create an ext without children keys
-		let ext = Builder::<Block>::new()
+		let mut ext = Builder::<Block>::new()
 			.mode(Mode::Online(OnlineConfig {
 				transport: endpoint().clone().into(),
 				pallets: vec!["Proxy".to_owned()],