From d11e60510ef48da14643ccc50790f0ca0695eb51 Mon Sep 17 00:00:00 2001
From: Arkadiy Paronyan <arkady.paronyan@gmail.com>
Date: Wed, 5 May 2021 19:16:34 +0300
Subject: [PATCH] Added client function to delete a recent block (#8533)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Implemented recent block removal

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
---
 substrate/client/api/src/backend.rs           |   6 +
 substrate/client/api/src/in_mem.rs            |   7 +
 substrate/client/api/src/leaves.rs            |   4 +-
 substrate/client/db/src/lib.rs                |  86 +++++++
 substrate/client/light/src/backend.rs         |   7 +
 substrate/client/state-db/src/lib.rs          |  17 ++
 substrate/client/state-db/src/noncanonical.rs | 215 ++++++++++++++----
 7 files changed, 294 insertions(+), 48 deletions(-)

diff --git a/substrate/client/api/src/backend.rs b/substrate/client/api/src/backend.rs
index 14841d8d3e9..09e9e0cb2e1 100644
--- a/substrate/client/api/src/backend.rs
+++ b/substrate/client/api/src/backend.rs
@@ -475,6 +475,12 @@ pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {
 		revert_finalized: bool,
 	) -> sp_blockchain::Result<(NumberFor<Block>, HashSet<Block::Hash>)>;
 
+	/// Discard non-best, unfinalized leaf block.
+	fn remove_leaf_block(
+		&self,
+		hash: &Block::Hash,
+	) -> sp_blockchain::Result<()>;
+
 	/// Insert auxiliary data into key-value store.
 	fn insert_aux<
 		'a,
diff --git a/substrate/client/api/src/in_mem.rs b/substrate/client/api/src/in_mem.rs
index 409b5f52b5d..d756e1cc0bb 100644
--- a/substrate/client/api/src/in_mem.rs
+++ b/substrate/client/api/src/in_mem.rs
@@ -770,6 +770,13 @@ impl<Block: BlockT> backend::Backend<Block> for Backend<Block> where Block::Hash
 		Ok((Zero::zero(), HashSet::new()))
 	}
 
+	fn remove_leaf_block(
+		&self,
+		_hash: &Block::Hash,
+	) -> sp_blockchain::Result<()> {
+		Ok(())
+	}
+
 	fn get_import_lock(&self) -> &RwLock<()> {
 		&self.import_lock
 	}
diff --git a/substrate/client/api/src/leaves.rs b/substrate/client/api/src/leaves.rs
index 47cac8b186f..0474d5bb8fe 100644
--- a/substrate/client/api/src/leaves.rs
+++ b/substrate/client/api/src/leaves.rs
@@ -216,8 +216,8 @@ impl<H, N> LeafSet<H, N> where
 		self.pending_removed.clear();
 	}
 
-	#[cfg(test)]
-	fn contains(&self, number: N, hash: H) -> bool {
+	/// Check if given block is a leaf.
+	pub fn contains(&self, number: N, hash: H) -> bool {
 		self.storage.get(&Reverse(number)).map_or(false, |hashes| hashes.contains(&hash))
 	}
 
diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs
index c7bac13e719..94535cf28ae 100644
--- a/substrate/client/db/src/lib.rs
+++ b/substrate/client/db/src/lib.rs
@@ -623,6 +623,7 @@ impl<Block: BlockT> HeaderMetadata<Block> for BlockchainDb<Block> {
 	}
 
 	fn remove_header_metadata(&self, hash: Block::Hash) {
+		self.header_cache.lock().remove(&hash);
 		self.header_metadata_cache.remove_header_metadata(hash);
 	}
 }
@@ -1972,6 +1973,59 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
 		Ok((reverted, reverted_finalized))
 	}
 
+	fn remove_leaf_block(
+		&self,
+		hash: &Block::Hash,
+	) -> ClientResult<()> {
+		let best_hash = self.blockchain.info().best_hash;
+
+		if best_hash == *hash {
+			return Err(
+				sp_blockchain::Error::Backend(
+					format!("Can't remove best block {:?}", hash)
+				)
+			)
+		}
+
+		let hdr = self.blockchain.header_metadata(hash.clone())?;
+		if !self.have_state_at(&hash, hdr.number) {
+			return Err(
+				sp_blockchain::Error::UnknownBlock(
+					format!("State already discarded for {:?}", hash)
+				)
+			)
+		}
+
+		let mut leaves = self.blockchain.leaves.write();
+		if !leaves.contains(hdr.number, *hash) {
+			return Err(
+				sp_blockchain::Error::Backend(
+					format!("Can't remove non-leaf block {:?}", hash)
+				)
+			)
+		}
+
+		let mut transaction = Transaction::new();
+		if let Some(commit) = self.storage.state_db.remove(hash) {
+			apply_state_commit(&mut transaction, commit);
+		}
+		transaction.remove(columns::KEY_LOOKUP, hash.as_ref());
+		let changes_trie_cache_ops = self.changes_tries_storage.revert(
+			&mut transaction,
+			&cache::ComplexBlockId::new(
+				*hash,
+				hdr.number,
+			),
+		)?;
+
+		self.changes_tries_storage.post_commit(Some(changes_trie_cache_ops));
+		leaves.revert(hash.clone(), hdr.number);
+		leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX);
+		self.storage.db.commit(transaction)?;
+		self.blockchain().remove_header_metadata(*hash);
+		Ok(())
+	}
+
 	fn blockchain(&self) -> &BlockchainDb<Block> {
 		&self.blockchain
 	}
@@ -3008,4 +3062,36 @@ pub(crate) mod tests {
 			}
 		}
 	}
+
+	#[test]
+	fn remove_leaf_block_works() {
+		let backend = Backend::<Block>::new_test_with_tx_storage(
+			2,
+			10,
+			TransactionStorageMode::StorageChain
+		);
+		let mut blocks = Vec::new();
+		let mut prev_hash = Default::default();
+		for i in 0 .. 2 {
+			let hash = insert_block(&backend, i, prev_hash, None, Default::default(), vec![i.into()], None);
+			blocks.push(hash);
+			prev_hash = hash;
+		}
+
+		// insert a fork at block 2, which becomes best block
+		let best_hash = insert_block(
+			&backend,
+			1,
+			blocks[0],
+			None,
+			sp_core::H256::random(),
+			vec![42.into()],
+			None
+		);
+		assert!(backend.remove_leaf_block(&best_hash).is_err());
+		assert!(backend.have_state_at(&prev_hash, 1));
+		backend.remove_leaf_block(&prev_hash).unwrap();
+		assert_eq!(None, backend.blockchain().header(BlockId::hash(prev_hash.clone())).unwrap());
+		assert!(!backend.have_state_at(&prev_hash, 1));
+	}
 }
diff --git a/substrate/client/light/src/backend.rs b/substrate/client/light/src/backend.rs
index 621ada13ff6..d6f86209afe 100644
--- a/substrate/client/light/src/backend.rs
+++ b/substrate/client/light/src/backend.rs
@@ -246,6 +246,13 @@ impl<S, Block> ClientBackend<Block> for Backend<S, HashFor<Block>>
 		Err(ClientError::NotAvailableOnLightClient)
 	}
 
+	fn remove_leaf_block(
+		&self,
+		_hash: &Block::Hash,
+	) -> ClientResult<()> {
+		Err(ClientError::NotAvailableOnLightClient)
+	}
+
 	fn get_import_lock(&self) -> &RwLock<()> {
 		&self.import_lock
 	}
diff --git a/substrate/client/state-db/src/lib.rs b/substrate/client/state-db/src/lib.rs
index 8961f2549b2..1340442061a 100644
--- a/substrate/client/state-db/src/lib.rs
+++ b/substrate/client/state-db/src/lib.rs
@@ -364,6 +364,17 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDbSync<Block
 		}
 	}
 
+	fn remove(&mut self, hash: &BlockHash) -> Option<CommitSet<Key>> {
+		match self.mode {
+			PruningMode::ArchiveAll => {
+				Some(CommitSet::default())
+			},
+			PruningMode::ArchiveCanonical | PruningMode::Constrained(_) => {
+				self.non_canonical.remove(hash)
+			},
+		}
+	}
+
 	fn pin(&mut self, hash: &BlockHash) -> Result<(), PinError> {
 		match self.mode {
 			PruningMode::ArchiveAll => Ok(()),
@@ -509,6 +520,12 @@ impl<BlockHash: Hash + MallocSizeOf, Key: Hash + MallocSizeOf> StateDb<BlockHash
 		self.db.write().revert_one()
 	}
 
+	/// Remove specified non-canonical block.
+	/// Returns a database commit or `None` if not possible.
+	pub fn remove(&self, hash: &BlockHash) -> Option<CommitSet<Key>> {
+		self.db.write().remove(hash)
+	}
+
 	/// Returns last finalized block number.
 	pub fn best_canonical(&self) -> Option<u64> {
 		return self.db.read().best_canonical()
diff --git a/substrate/client/state-db/src/noncanonical.rs b/substrate/client/state-db/src/noncanonical.rs
index 3f0c7d132f7..1a680b16ffb 100644
--- a/substrate/client/state-db/src/noncanonical.rs
+++ b/substrate/client/state-db/src/noncanonical.rs
@@ -36,7 +36,7 @@ const MAX_BLOCKS_PER_LEVEL: u64 = 32;
 #[derive(parity_util_mem_derive::MallocSizeOf)]
 pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> {
 	last_canonicalized: Option<(BlockHash, u64)>,
-	levels: VecDeque<Vec<BlockOverlay<BlockHash, Key>>>,
+	levels: VecDeque<OverlayLevel<BlockHash, Key>>,
 	parents: HashMap<BlockHash, BlockHash>,
 	pending_canonicalizations: Vec<BlockHash>,
 	pending_insertions: Vec<BlockHash>,
@@ -46,6 +46,36 @@ pub struct NonCanonicalOverlay<BlockHash: Hash, Key: Hash> {
 	pinned_insertions: HashMap<BlockHash, (Vec<Key>, u32)>,
 }
 
+#[derive(parity_util_mem_derive::MallocSizeOf)]
+#[cfg_attr(test, derive(PartialEq, Debug))]
+struct OverlayLevel<BlockHash: Hash, Key: Hash> {
+	blocks: Vec<BlockOverlay<BlockHash, Key>>,
+	used_indicies: u64, // Bitmask of available journal indicies.
+}
+
+impl<BlockHash: Hash, Key: Hash> OverlayLevel<BlockHash, Key> {
+	fn push(&mut self, overlay: BlockOverlay<BlockHash, Key>) {
+		self.used_indicies |= 1 << overlay.journal_index;
+		self.blocks.push(overlay)
+	}
+
+	fn available_index(&self) -> u64 {
+		self.used_indicies.trailing_ones() as u64
+	}
+
+	fn remove(&mut self, index: usize) -> BlockOverlay<BlockHash, Key> {
+		self.used_indicies &= !(1 << self.blocks[index].journal_index);
+		self.blocks.remove(index)
+	}
+
+	fn new() -> OverlayLevel<BlockHash, Key> {
+		OverlayLevel {
+			blocks: Vec::new(),
+			used_indicies: 0,
+		}
+	}
+}
+
 #[derive(Encode, Decode)]
 struct JournalRecord<BlockHash: Hash, Key: Hash> {
 	hash: BlockHash,
@@ -62,6 +92,7 @@ fn to_journal_key(block: u64, index: u64) -> Vec<u8> {
 #[derive(parity_util_mem_derive::MallocSizeOf)]
 struct BlockOverlay<BlockHash: Hash, Key: Hash> {
 	hash: BlockHash,
+	journal_index: u64,
 	journal_key: Vec<u8>,
 	inserted: Vec<Key>,
 	deleted: Vec<Key>,
@@ -93,7 +124,7 @@ fn discard_values<Key: Hash>(values: &mut HashMap<Key, (u32, DBValue)>, inserted
 }
 
 fn discard_descendants<BlockHash: Hash, Key: Hash>(
-	levels: &mut (&mut [Vec<BlockOverlay<BlockHash, Key>>], &mut [Vec<BlockOverlay<BlockHash, Key>>]),
+	levels: &mut (&mut [OverlayLevel<BlockHash, Key>], &mut [OverlayLevel<BlockHash, Key>]),
 	mut values: &mut HashMap<Key, (u32, DBValue)>,
 	parents: &mut HashMap<BlockHash, BlockHash>,
 	pinned: &HashMap<BlockHash, u32>,
@@ -111,36 +142,32 @@ fn discard_descendants<BlockHash: Hash, Key: Hash>(
 	};
 	let mut pinned_children = 0;
 	if let Some(level) = first {
-		*level = level.drain(..).filter_map(|overlay| {
-			let parent = parents.get(&overlay.hash)
-				.expect("there is a parent entry for each entry in levels; qed");
-
-			if parent == hash {
-				let mut num_pinned = discard_descendants(
-					&mut remainder,
-					values,
-					parents,
-					pinned,
-					pinned_insertions,
-					&overlay.hash
-				);
-				if pinned.contains_key(&overlay.hash) {
-					num_pinned += 1;
-				}
-				if num_pinned != 0 {
-					// save to be discarded later.
-					pinned_insertions.insert(overlay.hash.clone(), (overlay.inserted, num_pinned));
-					pinned_children += num_pinned;
-				} else {
-					// discard immediately.
-					parents.remove(&overlay.hash);
-					discard_values(&mut values, overlay.inserted);
-				}
-				None
+		while let Some(i) = level.blocks.iter().position(|overlay| parents.get(&overlay.hash)
+			.expect("there is a parent entry for each entry in levels; qed")
+			== hash)
+		{
+			let overlay = level.remove(i);
+			let mut num_pinned = discard_descendants(
+				&mut remainder,
+				values,
+				parents,
+				pinned,
+				pinned_insertions,
+				&overlay.hash
+			);
+			if pinned.contains_key(&overlay.hash) {
+				num_pinned += 1;
+			}
+			if num_pinned != 0 {
+				// save to be discarded later.
+				pinned_insertions.insert(overlay.hash.clone(), (overlay.inserted, num_pinned));
+				pinned_children += num_pinned;
 			} else {
-				Some(overlay)
+				// discard immediately.
+				parents.remove(&overlay.hash);
+				discard_values(&mut values, overlay.inserted);
 			}
-		}).collect();
+		}
 	}
 	pinned_children
 }
@@ -161,7 +188,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 			let mut total: u64 = 0;
 			block += 1;
 			loop {
-				let mut level = Vec::new();
+				let mut level = OverlayLevel::new();
 				for index in 0 .. MAX_BLOCKS_PER_LEVEL {
 					let journal_key = to_journal_key(block, index);
 					if let Some(record) = db.get_meta(&journal_key).map_err(|e| Error::Db(e))? {
@@ -169,6 +196,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 						let inserted = record.inserted.iter().map(|(k, _)| k.clone()).collect();
 						let overlay = BlockOverlay {
 							hash: record.hash.clone(),
+							journal_index: index,
 							journal_key,
 							inserted: inserted,
 							deleted: record.deleted,
@@ -187,7 +215,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 						total += 1;
 					}
 				}
-				if level.is_empty() {
+				if level.blocks.is_empty() {
 					break;
 				}
 				levels.push_back(level);
@@ -235,23 +263,24 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 			}
 		}
 		let level = if self.levels.is_empty() || number == front_block_number + self.levels.len() as u64 {
-			self.levels.push_back(Vec::new());
+			self.levels.push_back(OverlayLevel::new());
 			self.levels.back_mut().expect("can't be empty after insertion; qed")
 		} else {
 			self.levels.get_mut((number - front_block_number) as usize)
 				.expect("number is [front_block_number .. front_block_number + levels.len()) is asserted in precondition; qed")
 		};
 
-		if level.len() >= MAX_BLOCKS_PER_LEVEL as usize {
+		if level.blocks.len() >= MAX_BLOCKS_PER_LEVEL as usize {
 			return Err(Error::TooManySiblingBlocks);
 		}
 
-		let index = level.len() as u64;
+		let index = level.available_index();
 		let journal_key = to_journal_key(number, index);
 
 		let inserted = changeset.inserted.iter().map(|(k, _)| k.clone()).collect();
 		let overlay = BlockOverlay {
 			hash: hash.clone(),
+			journal_index: index,
 			journal_key: journal_key.clone(),
 			inserted: inserted,
 			deleted: changeset.deleted.clone(),
@@ -279,7 +308,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 		hash: &BlockHash
 	) {
 		if let Some(level) = self.levels.get(level_index) {
-			level.iter().for_each(|overlay| {
+			level.blocks.iter().for_each(|overlay| {
 				let parent = self.parents.get(&overlay.hash).expect("there is a parent entry for each entry in levels; qed").clone();
 				if parent == *hash {
 					discarded_journals.push(overlay.journal_key.clone());
@@ -310,7 +339,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 		let start = self.last_canonicalized_block_number().unwrap_or(0);
 		self.levels
 			.get(self.pending_canonicalizations.len())
-			.map(|level| level.iter().map(|r| (r.hash.clone(), start)).collect())
+			.map(|level| level.blocks.iter().map(|r| (r.hash.clone(), start)).collect())
 			.unwrap_or_default()
 	}
 
@@ -323,14 +352,14 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 	) -> Result<(), Error<E>> {
 		trace!(target: "state-db", "Canonicalizing {:?}", hash);
 		let level = self.levels.get(self.pending_canonicalizations.len()).ok_or_else(|| Error::InvalidBlock)?;
-		let index = level
+		let index = level.blocks
 			.iter()
 			.position(|overlay| overlay.hash == *hash)
 			.ok_or_else(|| Error::InvalidBlock)?;
 
 		let mut discarded_journals = Vec::new();
 		let mut discarded_blocks = Vec::new();
-		for (i, overlay) in level.iter().enumerate() {
+		for (i, overlay) in level.blocks.iter().enumerate() {
 			if i != index {
 				self.discard_journals(
 					self.pending_canonicalizations.len() + 1,
@@ -344,7 +373,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 		}
 
 		// get the one we need to canonicalize
-		let overlay = &level[index];
+		let overlay = &level.blocks[index];
 		commit.data.inserted.extend(overlay.inserted.iter()
 			.map(|k| (k.clone(), self.values.get(k).expect("For each key in overlays there's a value in values").1.clone())));
 		commit.data.deleted.extend(overlay.deleted.clone());
@@ -363,13 +392,13 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 		for hash in self.pending_canonicalizations.drain(..) {
 			trace!(target: "state-db", "Post canonicalizing {:?}", hash);
 			let level = self.levels.pop_front().expect("Hash validity is checked in `canonicalize`");
-			let index = level
+			let index = level.blocks
 				.iter()
 				.position(|overlay| overlay.hash == hash)
 				.expect("Hash validity is checked in `canonicalize`");
 
 			// discard unfinalized overlays and values
-			for (i, overlay) in level.into_iter().enumerate() {
+			for (i, overlay) in level.blocks.into_iter().enumerate() {
 				let mut pinned_children = if i != index {
 					discard_descendants(
 						&mut self.levels.as_mut_slices(),
@@ -421,7 +450,7 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 	pub fn revert_one(&mut self) -> Option<CommitSet<Key>> {
 		self.levels.pop_back().map(|level| {
 			let mut commit = CommitSet::default();
-			for overlay in level.into_iter() {
+			for overlay in level.blocks.into_iter() {
 				commit.meta.deleted.push(overlay.journal_key);
 				self.parents.remove(&overlay.hash);
 				discard_values(&mut self.values, overlay.inserted);
@@ -430,6 +459,36 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 		})
 	}
 
+	/// Revert a single block. Returns commit set that deletes the journal or `None` if not possible.
+	pub fn remove(&mut self, hash: &BlockHash) -> Option<CommitSet<Key>> {
+		let mut commit = CommitSet::default();
+		let level_count = self.levels.len();
+		for (level_index, level) in self.levels.iter_mut().enumerate().rev() {
+			let index = match level.blocks.iter().position(|overlay| &overlay.hash == hash) {
+				Some(index) => index,
+				None => continue,
+			};
+			// Check that it does not have any children
+			if (level_index != level_count - 1) && self.parents.values().any(|h| h == hash) {
+				log::debug!(target: "state-db", "Trying to remove block {:?} with children", hash);
+				return None;
+			}
+			let overlay = level.remove(index);
+			commit.meta.deleted.push(overlay.journal_key);
+			self.parents.remove(&overlay.hash);
+			discard_values(&mut self.values, overlay.inserted);
+			break;
+		}
+		if self.levels.back().map_or(false, |l| l.blocks.is_empty()) {
+			self.levels.pop_back();
+		}
+		if !commit.meta.deleted.is_empty() {
+			Some(commit)
+		} else {
+			None
+		}
+	}
+
 	fn revert_insertions(&mut self) {
 		self.pending_insertions.reverse();
 		for hash in self.pending_insertions.drain(..) {
@@ -437,12 +496,13 @@ impl<BlockHash: Hash, Key: Hash> NonCanonicalOverlay<BlockHash, Key> {
 			// find a level. When iterating insertions backwards the hash is always last in the level.
 			let level_index =
 				self.levels.iter().position(|level|
-					level.last().expect("Hash is added in `insert` in reverse order").hash == hash)
+					level.blocks.last().expect("Hash is added in `insert` in reverse order").hash == hash)
 				.expect("Hash is added in insert");
 
-			let	overlay = self.levels[level_index].pop().expect("Empty levels are not allowed in self.levels");
+			let overlay_index = self.levels[level_index].blocks.len() - 1;
+			let overlay = self.levels[level_index].remove(overlay_index);
 			discard_values(&mut self.values, overlay.inserted);
-			if self.levels[level_index].is_empty() {
+			if self.levels[level_index].blocks.is_empty() {
 				debug_assert_eq!(level_index, self.levels.len() - 1);
 				self.levels.pop_back();
 			}
@@ -1000,4 +1060,67 @@ mod tests {
 		overlay.apply_pending();
 		assert!(!contains(&overlay, 21));
 	}
+
+	#[test]
+	fn index_reuse() {
+		// This test discards a branch that is journaled under a non-zero index on level 1,
+		// making sure all journals are loaded for each level even if some of them are missing.
+		let root = H256::random();
+		let h1 = H256::random();
+		let h2 = H256::random();
+		let h11 = H256::random();
+		let h21 = H256::random();
+		let mut db = make_db(&[]);
+		let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
+		db.commit(&overlay.insert::<io::Error>(&root, 10, &H256::default(), make_changeset(&[], &[])).unwrap());
+		db.commit(&overlay.insert::<io::Error>(&h1, 11, &root, make_changeset(&[1], &[])).unwrap());
+		db.commit(&overlay.insert::<io::Error>(&h2, 11, &root, make_changeset(&[2], &[])).unwrap());
+		db.commit(&overlay.insert::<io::Error>(&h11, 12, &h1, make_changeset(&[11], &[])).unwrap());
+		db.commit(&overlay.insert::<io::Error>(&h21, 12, &h2, make_changeset(&[21], &[])).unwrap());
+		let mut commit = CommitSet::default();
+		overlay.canonicalize::<io::Error>(&root, &mut commit).unwrap();
+		overlay.canonicalize::<io::Error>(&h2, &mut commit).unwrap();  // h11 should stay in the DB
+		db.commit(&commit);
+		overlay.apply_pending();
+
+		// add another block at top level. It should reuse journal index 0 of previously discarded block
+		let h22 = H256::random(); 
+		db.commit(&overlay.insert::<io::Error>(&h22, 12, &h2, make_changeset(&[22], &[])).unwrap());
+		assert_eq!(overlay.levels[0].blocks[0].journal_index, 1);
+		assert_eq!(overlay.levels[0].blocks[1].journal_index, 0);
+
+		// Restore into a new overlay and check that journaled value exists.
+		let overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
+		assert_eq!(overlay.parents.len(), 2);
+		assert!(contains(&overlay, 21));
+		assert!(contains(&overlay, 22));
+	}
+
+	#[test]
+	fn remove_works() {
+		let root = H256::random();
+		let h1 = H256::random();
+		let h2 = H256::random();
+		let h11 = H256::random();
+		let h21 = H256::random();
+		let mut db = make_db(&[]);
+		let mut overlay = NonCanonicalOverlay::<H256, H256>::new(&db).unwrap();
+		db.commit(&overlay.insert::<io::Error>(&root, 10, &H256::default(), make_changeset(&[], &[])).unwrap());
+		db.commit(&overlay.insert::<io::Error>(&h1, 11, &root, make_changeset(&[1], &[])).unwrap());
+		db.commit(&overlay.insert::<io::Error>(&h2, 11, &root, make_changeset(&[2], &[])).unwrap());
+		db.commit(&overlay.insert::<io::Error>(&h11, 12, &h1, make_changeset(&[11], &[])).unwrap());
+		db.commit(&overlay.insert::<io::Error>(&h21, 12, &h2, make_changeset(&[21], &[])).unwrap());
+		assert!(overlay.remove(&h1).is_none());
+		assert!(overlay.remove(&h2).is_none());
+		assert_eq!(overlay.levels.len(), 3);
+
+		db.commit(&overlay.remove(&h11).unwrap());
+		assert!(!contains(&overlay, 11));
+
+		db.commit(&overlay.remove(&h21).unwrap());
+		assert_eq!(overlay.levels.len(), 2);
+
+		db.commit(&overlay.remove(&h2).unwrap());
+		assert!(!contains(&overlay, 2));
+	}
 }
-- 
GitLab