From f69c1751199e361c4dbfbabe63f5bdc3e7618611 Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Tue, 6 Jul 2021 16:00:52 -0500
Subject: [PATCH] Chain Selection: Follow-ups (#3328)

* DB skeleton

* key formats

* lexicographic test

* custom types for DB

* implement backend for db-v1

* remove VoidBackend and integrate with real DbBackend

* detect stagnant blocks on in interval

* fix tests

* add tests for stagnant

* send ChainSelectionMessage::Approved

* tests for DB backend

* unused import

* upgrade kvdb-memorydb

Co-authored-by: Andronik Ordian <write@reusable.software>
---
 polkadot/Cargo.lock                           |   2 +
 .../approval-voting/src/approval_db/v1/mod.rs |  19 +-
 .../src/approval_db/v1/tests.rs               |   6 +-
 .../node/core/approval-voting/src/import.rs   |  19 +-
 polkadot/node/core/approval-voting/src/lib.rs |   7 +-
 .../node/core/approval-voting/src/tests.rs    |  21 +-
 polkadot/node/core/chain-selection/Cargo.toml |   2 +
 .../chain-selection/src/db_backend/mod.rs     |  19 +
 .../core/chain-selection/src/db_backend/v1.rs | 668 ++++++++++++++++++
 polkadot/node/core/chain-selection/src/lib.rs | 290 +++++---
 .../node/core/chain-selection/src/tests.rs    | 445 +++++++++++-
 .../node/core/chain-selection/src/tree.rs     |   7 +-
 12 files changed, 1357 insertions(+), 148 deletions(-)
 create mode 100644 polkadot/node/core/chain-selection/src/db_backend/mod.rs
 create mode 100644 polkadot/node/core/chain-selection/src/db_backend/v1.rs

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 3496afc6e7e..ac5849410db 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -6176,7 +6176,9 @@ version = "0.1.0"
 dependencies = [
  "assert_matches",
  "futures 0.3.15",
+ "futures-timer 3.0.2",
  "kvdb",
+ "kvdb-memorydb",
  "parity-scale-codec",
  "parking_lot 0.11.1",
  "polkadot-node-primitives",
diff --git a/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs b/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs
index 27960eb2921..f6307a8b5a1 100644
--- a/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs
+++ b/polkadot/node/core/approval-voting/src/approval_db/v1/mod.rs
@@ -423,17 +423,21 @@ pub(crate) fn add_block_entry(
 
 /// Forcibly approve all candidates included at up to the given relay-chain height in the indicated
 /// chain.
+///
+/// Returns a list of block hashes that were not approved and are now.
 pub fn force_approve(
 	store: &dyn KeyValueDB,
 	db_config: Config,
 	chain_head: Hash,
 	up_to: BlockNumber,
-) -> Result<()> {
+) -> Result<Vec<Hash>> {
 	enum State {
 		WalkTo,
 		Approving,
 	}
 
+	let mut approved_hashes = Vec::new();
+
 	let mut cur_hash = chain_head;
 	let mut state = State::WalkTo;
 
@@ -452,13 +456,20 @@ pub fn force_approve(
 		match state {
 			State::WalkTo => {},
 			State::Approving => {
-				entry.approved_bitfield.iter_mut().for_each(|mut b| *b = true);
-				tx.put_block_entry(entry);
+				let is_approved = entry.approved_bitfield.count_ones()
+					== entry.approved_bitfield.len();
+
+				if !is_approved {
+					entry.approved_bitfield.iter_mut().for_each(|mut b| *b = true);
+					approved_hashes.push(entry.block_hash);
+					tx.put_block_entry(entry);
+				}
 			}
 		}
 	}
 
-	tx.write(store)
+	tx.write(store)?;
+	Ok(approved_hashes)
 }
 
 /// Return all blocks which have entries in the DB, ascending, by height.
diff --git a/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs b/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs
index 71c4d3c47e2..d0056cd98d9 100644
--- a/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs
+++ b/polkadot/node/core/approval-voting/src/approval_db/v1/tests.rs
@@ -534,7 +534,7 @@ fn force_approve_works() {
 		).unwrap();
 	}
 
-	force_approve(&store, TEST_CONFIG,  block_hash_d, 2).unwrap();
+	let approved_hashes = force_approve(&store, TEST_CONFIG,  block_hash_d, 2).unwrap();
 
 	assert!(load_block_entry(
 		&store,
@@ -556,6 +556,10 @@ fn force_approve_works() {
 		&TEST_CONFIG,
 		&block_hash_d,
 	).unwrap().unwrap().approved_bitfield.not_any());
+	assert_eq!(
+		approved_hashes,
+		vec![block_hash_b, block_hash_a],
+	);
 }
 
 #[test]
diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs
index bcb59f6ba7b..6cdbfe8550a 100644
--- a/polkadot/node/core/approval-voting/src/import.rs
+++ b/polkadot/node/core/approval-voting/src/import.rs
@@ -31,6 +31,7 @@
 use polkadot_node_subsystem::{
 	messages::{
 		RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage, ApprovalDistributionMessage,
+		ChainSelectionMessage,
 	},
 	SubsystemContext, SubsystemError, SubsystemResult,
 };
@@ -462,10 +463,16 @@ pub(crate) async fn handle_new_head(
 						result.len(),
 					);
 				}
+
 				result
 			}
 		};
 
+		// If all bits are already set, then send an approve message.
+		if approved_bitfield.count_ones() == approved_bitfield.len() {
+			ctx.send_message(ChainSelectionMessage::Approved(block_hash).into()).await;
+		}
+
 		let block_entry = approval_db::v1::BlockEntry {
 			block_hash,
 			parent_hash: block_header.parent_hash,
@@ -487,8 +494,18 @@ pub(crate) async fn handle_new_head(
 				"Enacting force-approve",
 			);
 
-			approval_db::v1::force_approve(db_writer, db_config, block_hash, up_to)
+			let approved_hashes = approval_db::v1::force_approve(
+				db_writer,
+				db_config,
+				block_hash,
+				up_to,
+			)
 				.map_err(|e| SubsystemError::with_origin("approval-voting", e))?;
+
+			// Notify chain-selection of all approved hashes.
+			for hash in approved_hashes {
+				ctx.send_message(ChainSelectionMessage::Approved(hash).into()).await;
+			}
 		}
 
 		tracing::trace!(
diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs
index 54943c40b66..28cc1ca6a70 100644
--- a/polkadot/node/core/approval-voting/src/lib.rs
+++ b/polkadot/node/core/approval-voting/src/lib.rs
@@ -26,7 +26,7 @@ use polkadot_node_subsystem::{
 		AssignmentCheckError, AssignmentCheckResult, ApprovalCheckError, ApprovalCheckResult,
 		ApprovalVotingMessage, RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage,
 		ApprovalDistributionMessage, CandidateValidationMessage,
-		AvailabilityRecoveryMessage,
+		AvailabilityRecoveryMessage, ChainSelectionMessage,
 	},
 	errors::RecoveryError,
 	Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
@@ -717,6 +717,7 @@ enum Action {
 		candidate: CandidateReceipt,
 		backing_group: GroupIndex,
 	},
+	NoteApprovedInChainSelection(Hash),
 	IssueApproval(CandidateHash, ApprovalVoteRequest),
 	BecomeActive,
 	Conclude,
@@ -962,6 +963,9 @@ async fn handle_actions(
 					Some(_) => {},
 				}
 			}
+			Action::NoteApprovedInChainSelection(block_hash) => {
+				ctx.send_message(ChainSelectionMessage::Approved(block_hash).into()).await;
+			}
 			Action::BecomeActive => {
 				*mode = Mode::Active;
 
@@ -1805,6 +1809,7 @@ fn import_checked_approval(
 
 			if is_block_approved && !was_block_approved {
 				metrics.on_block_approved(status.tranche_now as _);
+				actions.push(Action::NoteApprovedInChainSelection(block_hash));
 			}
 
 			actions.push(Action::WriteBlockEntry(block_entry));
diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs
index 5603c362fd2..84006478bd8 100644
--- a/polkadot/node/core/approval-voting/src/tests.rs
+++ b/polkadot/node/core/approval-voting/src/tests.rs
@@ -850,6 +850,14 @@ fn import_checked_approval_updates_entries_and_schedules() {
 
 		assert_matches!(
 			actions.get(0).unwrap(),
+			Action::NoteApprovedInChainSelection(h) => {
+				assert_eq!(h, &block_hash);
+			}
+		);
+
+
+		assert_matches!(
+			actions.get(1).unwrap(),
 			Action::WriteBlockEntry(b_entry) => {
 				assert_eq!(b_entry.block_hash(), block_hash);
 				assert!(b_entry.is_fully_approved());
@@ -857,7 +865,7 @@ fn import_checked_approval_updates_entries_and_schedules() {
 			}
 		);
 		assert_matches!(
-			actions.get_mut(1).unwrap(),
+			actions.get_mut(2).unwrap(),
 			Action::WriteCandidateEntry(c_hash, ref mut c_entry) => {
 				assert_eq!(c_hash, &candidate_hash);
 				assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved());
@@ -1391,9 +1399,16 @@ fn import_checked_approval_sets_one_block_bit_at_a_time() {
 		ApprovalSource::Remote(validator_index_b),
 	);
 
-	assert_eq!(actions.len(), 2);
+	assert_eq!(actions.len(), 3);
 	assert_matches!(
 		actions.get(0).unwrap(),
+		Action::NoteApprovedInChainSelection(h) => {
+			assert_eq!(h, &block_hash);
+		}
+	);
+
+	assert_matches!(
+		actions.get(1).unwrap(),
 		Action::WriteBlockEntry(b_entry) => {
 			assert_eq!(b_entry.block_hash(), block_hash);
 			assert!(b_entry.is_fully_approved());
@@ -1403,7 +1418,7 @@ fn import_checked_approval_sets_one_block_bit_at_a_time() {
 	);
 
 	assert_matches!(
-		actions.get(1).unwrap(),
+		actions.get(2).unwrap(),
 		Action::WriteCandidateEntry(c_h, c_entry) => {
 			assert_eq!(c_h, &candidate_hash_2);
 			assert!(c_entry.approval_entry(&block_hash).unwrap().is_approved());
diff --git a/polkadot/node/core/chain-selection/Cargo.toml b/polkadot/node/core/chain-selection/Cargo.toml
index 53e74cf883a..f6b42a2bec4 100644
--- a/polkadot/node/core/chain-selection/Cargo.toml
+++ b/polkadot/node/core/chain-selection/Cargo.toml
@@ -7,6 +7,7 @@ edition = "2018"
 
 [dependencies]
 futures = "0.3.15"
+futures-timer = "3"
 tracing = "0.1.26"
 polkadot-primitives = { path = "../../../primitives" }
 polkadot-node-primitives = { path = "../../primitives" }
@@ -21,3 +22,4 @@ polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
 sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
 parking_lot = "0.11"
 assert_matches = "1"
+kvdb-memorydb = "0.10.0"
diff --git a/polkadot/node/core/chain-selection/src/db_backend/mod.rs b/polkadot/node/core/chain-selection/src/db_backend/mod.rs
new file mode 100644
index 00000000000..66e61426b73
--- /dev/null
+++ b/polkadot/node/core/chain-selection/src/db_backend/mod.rs
@@ -0,0 +1,19 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! A database [`Backend`][crate::backend::Backend] for the chain selection subsystem.
+
+pub(super) mod v1;
diff --git a/polkadot/node/core/chain-selection/src/db_backend/v1.rs b/polkadot/node/core/chain-selection/src/db_backend/v1.rs
new file mode 100644
index 00000000000..6aea4af8c13
--- /dev/null
+++ b/polkadot/node/core/chain-selection/src/db_backend/v1.rs
@@ -0,0 +1,668 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! A database [`Backend`][crate::backend::Backend] for the chain selection subsystem.
+//!
+//! This stores the following schema:
+//!
+//! ```ignore
+//! ("CS_block_entry", Hash) -> BlockEntry;
+//! ("CS_block_height", BigEndianBlockNumber) -> Vec<Hash>;
+//! ("CS_stagnant_at", BigEndianTimestamp) -> Vec<Hash>;
+//! ("CS_leaves") -> LeafEntrySet;
+//! ```
+//!
+//! The big-endian encoding is used for creating iterators over the key-value DB which are
+//! accessible by prefix, to find the earlist block number stored as well as the all stagnant
+//! blocks.
+//!
+//! The `Vec`s stored are always non-empty. Empty `Vec`s are not stored on disk so there is no
+//! semantic difference between `None` and an empty `Vec`.
+
+use crate::backend::{Backend, BackendWriteOp};
+use crate::Error;
+
+use polkadot_primitives::v1::{BlockNumber, Hash};
+use polkadot_node_primitives::BlockWeight;
+
+use kvdb::{DBTransaction, KeyValueDB};
+use parity_scale_codec::{Encode, Decode};
+
+use std::sync::Arc;
+
+const BLOCK_ENTRY_PREFIX: &[u8; 14] = b"CS_block_entry";
+const BLOCK_HEIGHT_PREFIX: &[u8; 15] = b"CS_block_height";
+const STAGNANT_AT_PREFIX: &[u8; 14] = b"CS_stagnant_at";
+const LEAVES_KEY: &[u8; 9] = b"CS_leaves";
+
+type Timestamp = u64;
+
+#[derive(Debug, Encode, Decode, Clone, PartialEq)]
+enum Approval {
+	#[codec(index = 0)]
+	Approved,
+	#[codec(index = 1)]
+	Unapproved,
+	#[codec(index = 2)]
+	Stagnant,
+}
+
+impl From<crate::Approval> for Approval {
+	fn from(x: crate::Approval) -> Self {
+		match x {
+			crate::Approval::Approved => Approval::Approved,
+			crate::Approval::Unapproved => Approval::Unapproved,
+			crate::Approval::Stagnant => Approval::Stagnant,
+		}
+	}
+}
+
+impl From<Approval> for crate::Approval {
+	fn from(x: Approval) -> crate::Approval {
+		match x {
+			Approval::Approved => crate::Approval::Approved,
+			Approval::Unapproved => crate::Approval::Unapproved,
+			Approval::Stagnant => crate::Approval::Stagnant,
+		}
+	}
+}
+
+#[derive(Debug, Encode, Decode, Clone, PartialEq)]
+struct ViabilityCriteria {
+	explicitly_reverted: bool,
+	approval: Approval,
+	earliest_unviable_ancestor: Option<Hash>,
+}
+
+impl From<crate::ViabilityCriteria> for ViabilityCriteria {
+	fn from(x: crate::ViabilityCriteria) -> Self {
+		ViabilityCriteria {
+			explicitly_reverted: x.explicitly_reverted,
+			approval: x.approval.into(),
+			earliest_unviable_ancestor: x.earliest_unviable_ancestor,
+		}
+	}
+}
+
+impl From<ViabilityCriteria> for crate::ViabilityCriteria {
+	fn from(x: ViabilityCriteria) -> crate::ViabilityCriteria {
+		crate::ViabilityCriteria {
+			explicitly_reverted: x.explicitly_reverted,
+			approval: x.approval.into(),
+			earliest_unviable_ancestor: x.earliest_unviable_ancestor,
+		}
+	}
+}
+
+#[derive(Encode, Decode)]
+struct LeafEntry {
+	weight: BlockWeight,
+	block_number: BlockNumber,
+	block_hash: Hash,
+}
+
+impl From<crate::LeafEntry> for LeafEntry {
+	fn from(x: crate::LeafEntry) -> Self {
+		LeafEntry {
+			weight: x.weight,
+			block_number: x.block_number,
+			block_hash: x.block_hash,
+		}
+	}
+}
+
+impl From<LeafEntry> for crate::LeafEntry {
+	fn from(x: LeafEntry) -> crate::LeafEntry {
+		crate::LeafEntry {
+			weight: x.weight,
+			block_number: x.block_number,
+			block_hash: x.block_hash,
+		}
+	}
+}
+
+#[derive(Encode, Decode)]
+struct LeafEntrySet {
+	inner: Vec<LeafEntry>,
+}
+
+impl From<crate::LeafEntrySet> for LeafEntrySet {
+	fn from(x: crate::LeafEntrySet) -> Self {
+		LeafEntrySet {
+			inner: x.inner.into_iter().map(Into::into).collect(),
+		}
+	}
+}
+
+impl From<LeafEntrySet> for crate::LeafEntrySet {
+	fn from(x: LeafEntrySet) -> crate::LeafEntrySet {
+		crate::LeafEntrySet {
+			inner: x.inner.into_iter().map(Into::into).collect(),
+		}
+	}
+}
+
+#[derive(Debug, Encode, Decode, Clone, PartialEq)]
+struct BlockEntry {
+	block_hash: Hash,
+	block_number: BlockNumber,
+	parent_hash: Hash,
+	children: Vec<Hash>,
+	viability: ViabilityCriteria,
+	weight: BlockWeight,
+}
+
+impl From<crate::BlockEntry> for BlockEntry {
+	fn from(x: crate::BlockEntry) -> Self {
+		BlockEntry {
+			block_hash: x.block_hash,
+			block_number: x.block_number,
+			parent_hash: x.parent_hash,
+			children: x.children,
+			viability: x.viability.into(),
+			weight: x.weight,
+		}
+	}
+}
+
+impl From<BlockEntry> for crate::BlockEntry {
+	fn from(x: BlockEntry) -> crate::BlockEntry {
+		crate::BlockEntry {
+			block_hash: x.block_hash,
+			block_number: x.block_number,
+			parent_hash: x.parent_hash,
+			children: x.children,
+			viability: x.viability.into(),
+			weight: x.weight,
+		}
+	}
+}
+
+/// Configuration for the database backend.
+#[derive(Debug, Clone, Copy)]
+pub struct Config {
+	/// The column where block metadata is stored.
+	pub col_data: u32,
+}
+
+/// The database backend.
+pub struct DbBackend {
+	inner: Arc<dyn KeyValueDB>,
+	config: Config,
+}
+
+impl DbBackend {
+	/// Create a new [`DbBackend`] with the supplied key-value store and
+	/// config.
+	pub fn new(db: Arc<dyn KeyValueDB>, config: Config) -> Self {
+		DbBackend {
+			inner: db,
+			config,
+		}
+	}
+}
+
+impl Backend for DbBackend {
+	fn load_block_entry(&self, hash: &Hash) -> Result<Option<crate::BlockEntry>, Error> {
+		load_decode::<BlockEntry>(
+			&*self.inner,
+			self.config.col_data,
+			&block_entry_key(hash),
+		).map(|o| o.map(Into::into))
+	}
+
+	fn load_leaves(&self) -> Result<crate::LeafEntrySet, Error> {
+		load_decode::<LeafEntrySet>(
+			&*self.inner,
+			self.config.col_data,
+			LEAVES_KEY,
+		).map(|o| o.map(Into::into).unwrap_or_default())
+	}
+
+	fn load_stagnant_at(&self, timestamp: crate::Timestamp) -> Result<Vec<Hash>, Error> {
+		load_decode::<Vec<Hash>>(
+			&*self.inner,
+			self.config.col_data,
+			&stagnant_at_key(timestamp.into()),
+		).map(|o| o.unwrap_or_default())
+	}
+
+	fn load_stagnant_at_up_to(&self, up_to: crate::Timestamp)
+		-> Result<Vec<(crate::Timestamp, Vec<Hash>)>, Error>
+	{
+		let stagnant_at_iter = self.inner.iter_with_prefix(
+			self.config.col_data,
+			&STAGNANT_AT_PREFIX[..],
+		);
+
+		let val = stagnant_at_iter
+			.filter_map(|(k, v)| {
+				match (decode_stagnant_at_key(&mut &k[..]), <Vec<_>>::decode(&mut &v[..]).ok()) {
+					(Some(at), Some(stagnant_at)) => Some((at, stagnant_at)),
+					_ => None,
+				}
+			})
+			.take_while(|(at, _)| *at <= up_to.into())
+			.collect::<Vec<_>>();
+
+		Ok(val)
+	}
+
+	fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error> {
+		let blocks_at_height_iter = self.inner.iter_with_prefix(
+			self.config.col_data,
+			&BLOCK_HEIGHT_PREFIX[..],
+		);
+
+		let val = blocks_at_height_iter
+			.filter_map(|(k, _)| decode_block_height_key(&k[..]))
+			.next();
+
+		Ok(val)
+	}
+
+	fn load_blocks_by_number(&self, number: BlockNumber) -> Result<Vec<Hash>, Error> {
+		load_decode::<Vec<Hash>>(
+			&*self.inner,
+			self.config.col_data,
+			&block_height_key(number),
+		).map(|o| o.unwrap_or_default())
+	}
+
+	/// Atomically write the list of operations, with later operations taking precedence over prior.
+	fn write<I>(&mut self, ops: I) -> Result<(), Error>
+		where I: IntoIterator<Item = BackendWriteOp>
+	{
+		let mut tx = DBTransaction::new();
+		for op in ops {
+			match op {
+				BackendWriteOp::WriteBlockEntry(block_entry) => {
+					let block_entry: BlockEntry = block_entry.into();
+					tx.put_vec(
+						self.config.col_data,
+						&block_entry_key(&block_entry.block_hash),
+						block_entry.encode(),
+					);
+				}
+				BackendWriteOp::WriteBlocksByNumber(block_number, v) => {
+					if v.is_empty() {
+						tx.delete(
+							self.config.col_data,
+							&block_height_key(block_number),
+						);
+					} else {
+						tx.put_vec(
+							self.config.col_data,
+							&block_height_key(block_number),
+							v.encode(),
+						);
+					}
+				}
+				BackendWriteOp::WriteViableLeaves(leaves) => {
+					let leaves: LeafEntrySet = leaves.into();
+					if leaves.inner.is_empty() {
+						tx.delete(
+							self.config.col_data,
+							&LEAVES_KEY[..],
+						);
+					} else {
+						tx.put_vec(
+							self.config.col_data,
+							&LEAVES_KEY[..],
+							leaves.encode(),
+						);
+					}
+				}
+				BackendWriteOp::WriteStagnantAt(timestamp, stagnant_at) => {
+					let timestamp: Timestamp = timestamp.into();
+					if stagnant_at.is_empty() {
+						tx.delete(
+							self.config.col_data,
+							&stagnant_at_key(timestamp),
+						);
+					} else {
+						tx.put_vec(
+							self.config.col_data,
+							&stagnant_at_key(timestamp),
+							stagnant_at.encode(),
+						);
+					}
+				}
+				BackendWriteOp::DeleteBlocksByNumber(block_number) => {
+					tx.delete(
+						self.config.col_data,
+						&block_height_key(block_number),
+					);
+				}
+				BackendWriteOp::DeleteBlockEntry(hash) => {
+					tx.delete(
+						self.config.col_data,
+						&block_entry_key(&hash),
+					);
+				}
+				BackendWriteOp::DeleteStagnantAt(timestamp) => {
+					let timestamp: Timestamp = timestamp.into();
+					tx.delete(
+						self.config.col_data,
+						&stagnant_at_key(timestamp),
+					);
+				}
+			}
+		}
+
+		self.inner.write(tx).map_err(Into::into)
+	}
+}
+
+fn load_decode<D: Decode>(
+	db: &dyn KeyValueDB,
+	col_data: u32,
+	key: &[u8],
+) -> Result<Option<D>, Error> {
+	match db.get(col_data, key)? {
+		None => Ok(None),
+		Some(raw) => D::decode(&mut &raw[..])
+			.map(Some)
+			.map_err(Into::into),
+	}
+}
+
+fn block_entry_key(hash: &Hash) -> [u8; 14 + 32] {
+	let mut key = [0; 14 + 32];
+	key[..14].copy_from_slice(BLOCK_ENTRY_PREFIX);
+	hash.using_encoded(|s| key[14..].copy_from_slice(s));
+	key
+}
+
+fn block_height_key(number: BlockNumber) -> [u8; 15 + 4] {
+	let mut key = [0; 15 + 4];
+	key[..15].copy_from_slice(BLOCK_HEIGHT_PREFIX);
+	key[15..].copy_from_slice(&number.to_be_bytes());
+	key
+}
+
+fn stagnant_at_key(timestamp: Timestamp) -> [u8; 14 + 8] {
+	let mut key = [0; 14 + 8];
+	key[..14].copy_from_slice(STAGNANT_AT_PREFIX);
+	key[14..].copy_from_slice(&timestamp.to_be_bytes());
+	key
+}
+
+fn decode_block_height_key(key: &[u8]) -> Option<BlockNumber> {
+	if key.len() != 15 + 4 { return None }
+	if !key.starts_with(BLOCK_HEIGHT_PREFIX) { return None }
+
+	let mut bytes = [0; 4];
+	bytes.copy_from_slice(&key[15..]);
+	Some(BlockNumber::from_be_bytes(bytes))
+}
+
+fn decode_stagnant_at_key(key: &[u8]) -> Option<Timestamp> {
+	if key.len() != 14 + 8 { return None }
+	if !key.starts_with(STAGNANT_AT_PREFIX) { return None }
+
+	let mut bytes = [0; 8];
+	bytes.copy_from_slice(&key[14..]);
+	Some(Timestamp::from_be_bytes(bytes))
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn block_height_key_decodes() {
+		let key = block_height_key(5);
+		assert_eq!(decode_block_height_key(&key), Some(5));
+	}
+
+	#[test]
+	fn stagnant_at_key_decodes() {
+		let key = stagnant_at_key(5);
+		assert_eq!(decode_stagnant_at_key(&key), Some(5));
+	}
+
+	#[test]
+	fn lower_block_height_key_lesser() {
+		for i in 0..256 {
+			for j in 1..=256 {
+				let key_a = block_height_key(i);
+				let key_b = block_height_key(i + j);
+
+				assert!(key_a < key_b);
+			}
+		}
+	}
+
+	#[test]
+	fn lower_stagnant_at_key_lesser() {
+		for i in 0..256 {
+			for j in 1..=256 {
+				let key_a = stagnant_at_key(i);
+				let key_b = stagnant_at_key(i + j);
+
+				assert!(key_a < key_b);
+			}
+		}
+	}
+
+	#[test]
+	fn write_read_block_entry() {
+		let db = Arc::new(kvdb_memorydb::create(1));
+		let config = Config { col_data: 0 };
+
+		let mut backend = DbBackend::new(db, config);
+
+		let block_entry = BlockEntry {
+			block_hash: Hash::repeat_byte(1),
+			block_number: 1,
+			parent_hash: Hash::repeat_byte(0),
+			children: vec![],
+			viability: ViabilityCriteria {
+				earliest_unviable_ancestor: None,
+				explicitly_reverted: false,
+				approval: Approval::Unapproved,
+			},
+			weight: 100,
+		};
+
+		backend.write(vec![
+			BackendWriteOp::WriteBlockEntry(block_entry.clone().into())
+		]).unwrap();
+
+		assert_eq!(
+			backend.load_block_entry(&block_entry.block_hash).unwrap().map(BlockEntry::from),
+			Some(block_entry),
+		);
+	}
+
+	#[test]
+	fn delete_block_entry() {
+		let db = Arc::new(kvdb_memorydb::create(1));
+		let config = Config { col_data: 0 };
+
+		let mut backend = DbBackend::new(db, config);
+
+		let block_entry = BlockEntry {
+			block_hash: Hash::repeat_byte(1),
+			block_number: 1,
+			parent_hash: Hash::repeat_byte(0),
+			children: vec![],
+			viability: ViabilityCriteria {
+				earliest_unviable_ancestor: None,
+				explicitly_reverted: false,
+				approval: Approval::Unapproved,
+			},
+			weight: 100,
+		};
+
+		backend.write(vec![
+			BackendWriteOp::WriteBlockEntry(block_entry.clone().into())
+		]).unwrap();
+
+		backend.write(vec![
+			BackendWriteOp::DeleteBlockEntry(block_entry.block_hash),
+		]).unwrap();
+
+		assert!(
+			backend.load_block_entry(&block_entry.block_hash).unwrap().is_none(),
+		);
+	}
+
+	#[test]
+	fn earliest_block_number() {
+		let db = Arc::new(kvdb_memorydb::create(1));
+		let config = Config { col_data: 0 };
+
+		let mut backend = DbBackend::new(db, config);
+
+		assert!(
+			backend.load_first_block_number().unwrap().is_none(),
+		);
+
+		backend.write(vec![
+			BackendWriteOp::WriteBlocksByNumber(2, vec![Hash::repeat_byte(0)]),
+			BackendWriteOp::WriteBlocksByNumber(5, vec![Hash::repeat_byte(0)]),
+			BackendWriteOp::WriteBlocksByNumber(10, vec![Hash::repeat_byte(0)]),
+		]).unwrap();
+
+		assert_eq!(
+			backend.load_first_block_number().unwrap(),
+			Some(2),
+		);
+
+		backend.write(vec![
+			BackendWriteOp::WriteBlocksByNumber(2, vec![]),
+			BackendWriteOp::DeleteBlocksByNumber(5),
+		]).unwrap();
+
+		assert_eq!(
+			backend.load_first_block_number().unwrap(),
+			Some(10),
+		);
+	}
+
+	#[test]
+	fn stagnant_at_up_to() {
+		let db = Arc::new(kvdb_memorydb::create(1));
+		let config = Config { col_data: 0 };
+
+		let mut backend = DbBackend::new(db, config);
+
+		// Prove that it's cheap
+		assert!(
+			backend.load_stagnant_at_up_to(Timestamp::max_value()).unwrap().is_empty(),
+		);
+
+		backend.write(vec![
+			BackendWriteOp::WriteStagnantAt(2, vec![Hash::repeat_byte(1)]),
+			BackendWriteOp::WriteStagnantAt(5, vec![Hash::repeat_byte(2)]),
+			BackendWriteOp::WriteStagnantAt(10, vec![Hash::repeat_byte(3)]),
+		]).unwrap();
+
+		assert_eq!(
+			backend.load_stagnant_at_up_to(Timestamp::max_value()).unwrap(),
+			vec![
+				(2, vec![Hash::repeat_byte(1)]),
+				(5, vec![Hash::repeat_byte(2)]),
+				(10, vec![Hash::repeat_byte(3)]),
+			]
+		);
+
+		assert_eq!(
+			backend.load_stagnant_at_up_to(10).unwrap(),
+			vec![
+				(2, vec![Hash::repeat_byte(1)]),
+				(5, vec![Hash::repeat_byte(2)]),
+				(10, vec![Hash::repeat_byte(3)]),
+			]
+		);
+
+		assert_eq!(
+			backend.load_stagnant_at_up_to(9).unwrap(),
+			vec![
+				(2, vec![Hash::repeat_byte(1)]),
+				(5, vec![Hash::repeat_byte(2)]),
+			]
+		);
+
+		backend.write(vec![
+			BackendWriteOp::DeleteStagnantAt(2),
+		]).unwrap();
+
+		assert_eq!(
+			backend.load_stagnant_at_up_to(5).unwrap(),
+			vec![
+				(5, vec![Hash::repeat_byte(2)]),
+			]
+		);
+
+		backend.write(vec![
+			BackendWriteOp::WriteStagnantAt(5, vec![]),
+		]).unwrap();
+
+		assert_eq!(
+			backend.load_stagnant_at_up_to(10).unwrap(),
+			vec![
+				(10, vec![Hash::repeat_byte(3)]),
+			]
+		);
+	}
+
+	#[test]
+	fn write_read_blocks_at_height() {
+		let db = Arc::new(kvdb_memorydb::create(1));
+		let config = Config { col_data: 0 };
+
+		let mut backend = DbBackend::new(db, config);
+
+		backend.write(vec![
+			BackendWriteOp::WriteBlocksByNumber(2, vec![Hash::repeat_byte(1)]),
+			BackendWriteOp::WriteBlocksByNumber(5, vec![Hash::repeat_byte(2)]),
+			BackendWriteOp::WriteBlocksByNumber(10, vec![Hash::repeat_byte(3)]),
+		]).unwrap();
+
+		assert_eq!(
+			backend.load_blocks_by_number(2).unwrap(),
+			vec![Hash::repeat_byte(1)],
+		);
+
+		assert_eq!(
+			backend.load_blocks_by_number(3).unwrap(),
+			vec![],
+		);
+
+		backend.write(vec![
+			BackendWriteOp::WriteBlocksByNumber(2, vec![]),
+			BackendWriteOp::DeleteBlocksByNumber(5),
+		]).unwrap();
+
+		assert_eq!(
+			backend.load_blocks_by_number(2).unwrap(),
+			vec![],
+		);
+
+		assert_eq!(
+			backend.load_blocks_by_number(5).unwrap(),
+			vec![],
+		);
+
+		assert_eq!(
+			backend.load_blocks_by_number(10).unwrap(),
+			vec![Hash::repeat_byte(3)],
+		);
+	}
+}
diff --git a/polkadot/node/core/chain-selection/src/lib.rs b/polkadot/node/core/chain-selection/src/lib.rs
index dddfc2590d3..39d416eb22b 100644
--- a/polkadot/node/core/chain-selection/src/lib.rs
+++ b/polkadot/node/core/chain-selection/src/lib.rs
@@ -25,15 +25,18 @@ use polkadot_subsystem::{
 	errors::ChainApiError,
 };
 
+use kvdb::KeyValueDB;
 use parity_scale_codec::Error as CodecError;
 use futures::channel::oneshot;
 use futures::prelude::*;
 
-use std::time::{UNIX_EPOCH, SystemTime};
+use std::time::{UNIX_EPOCH, Duration,SystemTime};
+use std::sync::Arc;
 
 use crate::backend::{Backend, OverlayedBackend, BackendWriteOp};
 
 mod backend;
+mod db_backend;
 mod tree;
 
 #[cfg(test)]
@@ -43,6 +46,10 @@ const LOG_TARGET: &str = "parachain::chain-selection";
 /// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots.
 type Timestamp = u64;
 
+// If a block isn't approved in 120 seconds, nodes will abandon it
+// and begin building on another chain.
+const STAGNANT_TIMEOUT: Timestamp = 120;
+
 #[derive(Debug, Clone)]
 enum Approval {
 	// Approved
@@ -202,96 +209,143 @@ impl Error {
 	}
 }
 
-fn timestamp_now() -> Timestamp {
-	// `SystemTime` is notoriously non-monotonic, so our timers might not work
-	// exactly as expected. Regardless, stagnation is detected on the order of minutes,
-	// and slippage of a few seconds in either direction won't cause any major harm.
-	//
-	// The exact time that a block becomes stagnant in the local node is always expected
-	// to differ from other nodes due to network asynchrony and delays in block propagation.
-	// Non-monotonicity exarcerbates that somewhat, but not meaningfully.
-
-	match SystemTime::now().duration_since(UNIX_EPOCH) {
-		Ok(d) => d.as_secs(),
-		Err(e) => {
-			tracing::warn!(
-				target: LOG_TARGET,
-				err = ?e,
-				"Current time is before unix epoch. Validation will not work correctly."
-			);
-
-			0
-		}
-	}
+/// A clock used for fetching the current timestamp.
+pub trait Clock {
+	/// Get the current timestamp.
+	fn timestamp_now(&self) -> Timestamp;
 }
 
-fn stagnant_timeout_from_now() -> Timestamp {
-	// If a block isn't approved in 120 seconds, nodes will abandon it
-	// and begin building on another chain.
-	const STAGNANT_TIMEOUT: Timestamp = 120;
+struct SystemClock;
 
-	timestamp_now() + STAGNANT_TIMEOUT
-}
+impl Clock for SystemClock {
+	fn timestamp_now(&self) -> Timestamp {
+		// `SystemTime` is notoriously non-monotonic, so our timers might not work
+		// exactly as expected. Regardless, stagnation is detected on the order of minutes,
+		// and slippage of a few seconds in either direction won't cause any major harm.
+		//
+		// The exact time that a block becomes stagnant in the local node is always expected
+		// to differ from other nodes due to network asynchrony and delays in block propagation.
+		// Non-monotonicity exarcerbates that somewhat, but not meaningfully.
 
-// TODO https://github.com/paritytech/polkadot/issues/3293:
-//
-// This is used just so we can have a public function that calls
-// `run` and eliminates all the unused errors.
-//
-// Should be removed when the real implementation is done.
-struct VoidBackend;
+		match SystemTime::now().duration_since(UNIX_EPOCH) {
+			Ok(d) => d.as_secs(),
+			Err(e) => {
+				tracing::warn!(
+					target: LOG_TARGET,
+					err = ?e,
+					"Current time is before unix epoch. Validation will not work correctly."
+				);
 
-impl Backend for VoidBackend {
-	fn load_block_entry(&self, _: &Hash) -> Result<Option<BlockEntry>, Error> {
-		Ok(None)
-	}
-	fn load_leaves(&self) -> Result<LeafEntrySet, Error> {
-		Ok(LeafEntrySet::default())
-	}
-	fn load_stagnant_at(&self, _: Timestamp) -> Result<Vec<Hash>, Error> {
-		Ok(Vec::new())
-	}
-	fn load_stagnant_at_up_to(&self, _: Timestamp)
-		-> Result<Vec<(Timestamp, Vec<Hash>)>, Error>
-	{
-		Ok(Vec::new())
+				0
+			}
+		}
 	}
-	fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error> {
-		Ok(None)
+}
+
+/// The interval, in seconds to check for stagnant blocks.
+#[derive(Debug, Clone)]
+pub struct StagnantCheckInterval(Duration);
+
+impl Default for StagnantCheckInterval {
+	fn default() -> Self {
+		// 5 seconds is a reasonable balance between avoiding DB reads and
+		// ensuring validators are generally in agreement on stagnant blocks.
+		//
+		// Assuming a network delay of D, the longest difference in view possible
+		// between 2 validators is D + 5s.
+		const DEFAULT_STAGNANT_CHECK_INTERVAL: Duration = Duration::from_secs(5);
+
+		StagnantCheckInterval(DEFAULT_STAGNANT_CHECK_INTERVAL)
 	}
-	fn load_blocks_by_number(&self, _: BlockNumber) -> Result<Vec<Hash>, Error> {
-		Ok(Vec::new())
+}
+
+impl StagnantCheckInterval {
+	/// Create a new stagnant-check interval wrapping the given duration.
+	pub fn new(interval: Duration) -> Self {
+		StagnantCheckInterval(interval)
 	}
 
-	fn write<I>(&mut self, _: I) -> Result<(), Error>
-		where I: IntoIterator<Item = BackendWriteOp>
-	{
-		Ok(())
+	fn timeout_stream(&self) -> impl Stream<Item = ()> {
+		let interval = self.0;
+		let mut delay = futures_timer::Delay::new(interval);
+
+		futures::stream::poll_fn(move |cx| {
+			let poll = delay.poll_unpin(cx);
+			if poll.is_ready() {
+				delay.reset(interval)
+			}
+
+			poll.map(Some)
+		})
 	}
 }
 
+/// Configuration for the chain selection subsystem.
+#[derive(Debug, Clone)]
+pub struct Config {
+	/// The column in the database that the storage should use.
+	pub col_data: u32,
+	/// How often to check for stagnant blocks.
+	pub stagnant_check_interval: StagnantCheckInterval,
+}
+
 /// The chain selection subsystem.
-pub struct ChainSelectionSubsystem;
+pub struct ChainSelectionSubsystem {
+	config: Config,
+	db: Arc<dyn KeyValueDB>,
+}
+
+impl ChainSelectionSubsystem {
+	/// Create a new instance of the subsystem with the given config
+	/// and key-value store.
+	pub fn new(config: Config, db: Arc<dyn KeyValueDB>) -> Self {
+		ChainSelectionSubsystem {
+			config,
+			db,
+		}
+	}
+}
 
 impl<Context> Subsystem<Context> for ChainSelectionSubsystem
 	where Context: SubsystemContext<Message = ChainSelectionMessage>
 {
 	fn start(self, ctx: Context) -> SpawnedSubsystem {
-		let backend = VoidBackend;
+		let backend = crate::db_backend::v1::DbBackend::new(
+			self.db,
+			crate::db_backend::v1::Config { col_data: self.config.col_data },
+		);
+
 		SpawnedSubsystem {
-			future: run(ctx, backend).map(|()| Ok(())).boxed(),
+			future: run(
+				ctx,
+				backend,
+				self.config.stagnant_check_interval,
+				Box::new(SystemClock),
+			)
+				.map(Ok)
+				.boxed(),
 			name: "chain-selection-subsystem",
 		}
 	}
 }
 
-async fn run<Context, B>(mut ctx: Context, mut backend: B)
+async fn run<Context, B>(
+	mut ctx: Context,
+	mut backend: B,
+	stagnant_check_interval: StagnantCheckInterval,
+	clock: Box<dyn Clock + Send + Sync>,
+)
 	where
 		Context: SubsystemContext<Message = ChainSelectionMessage>,
 		B: Backend,
 {
 	loop {
-		let res = run_iteration(&mut ctx, &mut backend).await;
+		let res = run_iteration(
+			&mut ctx,
+			&mut backend,
+			&stagnant_check_interval,
+			&*clock,
+		).await;
 		match res {
 			Err(e) => {
 				e.trace();
@@ -313,55 +367,69 @@ async fn run<Context, B>(mut ctx: Context, mut backend: B)
 //
 // A return value of `Ok` indicates that an exit should be made, while non-fatal errors
 // lead to another call to this function.
-async fn run_iteration<Context, B>(ctx: &mut Context, backend: &mut B)
+async fn run_iteration<Context, B>(
+	ctx: &mut Context,
+	backend: &mut B,
+	stagnant_check_interval: &StagnantCheckInterval,
+	clock: &(dyn Clock + Sync),
+)
 	-> Result<(), Error>
 	where
 		Context: SubsystemContext<Message = ChainSelectionMessage>,
 		B: Backend,
 {
-	// TODO https://github.com/paritytech/polkadot/issues/3293: Add stagnant checking timer loop.
+	let mut stagnant_check_stream = stagnant_check_interval.timeout_stream();
 	loop {
-		match ctx.recv().await? {
-			FromOverseer::Signal(OverseerSignal::Conclude) => {
-				return Ok(())
-			}
-			FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
-				for leaf in update.activated {
-					let write_ops = handle_active_leaf(
-						ctx,
-						&*backend,
-						leaf.hash,
-					).await?;
-
-					backend.write(write_ops)?;
+		futures::select! {
+			msg = ctx.recv().fuse() => {
+				let msg = msg?;
+				match msg {
+					FromOverseer::Signal(OverseerSignal::Conclude) => {
+						return Ok(())
+					}
+					FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
+						for leaf in update.activated {
+							let write_ops = handle_active_leaf(
+								ctx,
+								&*backend,
+								clock.timestamp_now() + STAGNANT_TIMEOUT,
+								leaf.hash,
+							).await?;
+
+							backend.write(write_ops)?;
+						}
+					}
+					FromOverseer::Signal(OverseerSignal::BlockFinalized(h, n)) => {
+						handle_finalized_block(backend, h, n)?
+					}
+					FromOverseer::Communication { msg } => match msg {
+						ChainSelectionMessage::Approved(hash) => {
+							handle_approved_block(backend, hash)?
+						}
+						ChainSelectionMessage::Leaves(tx) => {
+							let leaves = load_leaves(ctx, &*backend).await?;
+							let _ = tx.send(leaves);
+						}
+						ChainSelectionMessage::BestLeafContaining(required, tx) => {
+							let best_containing = crate::backend::find_best_leaf_containing(
+								&*backend,
+								required,
+							)?;
+
+							// note - this may be none if the finalized block is
+							// a leaf. this is fine according to the expected usage of the
+							// function. `None` responses should just `unwrap_or(required)`,
+							// so if the required block is the finalized block, then voilá.
+
+							let _ = tx.send(best_containing);
+						}
+					}
 				}
 			}
-			FromOverseer::Signal(OverseerSignal::BlockFinalized(h, n)) => {
-				handle_finalized_block(backend, h, n)?
-			}
-			FromOverseer::Communication { msg } => match msg {
-				ChainSelectionMessage::Approved(hash) => {
-					handle_approved_block(backend, hash)?
-				}
-				ChainSelectionMessage::Leaves(tx) => {
-					let leaves = load_leaves(ctx, &*backend).await?;
-					let _ = tx.send(leaves);
-				}
-				ChainSelectionMessage::BestLeafContaining(required, tx) => {
-					let best_containing = crate::backend::find_best_leaf_containing(
-						&*backend,
-						required,
-					)?;
-
-					// note - this may be none if the finalized block is
-					// a leaf. this is fine according to the expected usage of the
-					// function. `None` responses should just `unwrap_or(required)`,
-					// so if the required block is the finalized block, then voilá.
-
-					let _ = tx.send(best_containing);
-				}
+			_ = stagnant_check_stream.next().fuse() => {
+				detect_stagnant(backend, clock.timestamp_now())?;
 			}
-		};
+		}
 	}
 }
 
@@ -415,6 +483,7 @@ async fn fetch_block_weight(
 async fn handle_active_leaf(
 	ctx: &mut impl SubsystemContext,
 	backend: &impl Backend,
+	stagnant_at: Timestamp,
 	hash: Hash,
 ) -> Result<Vec<BackendWriteOp>, Error> {
 	let lower_bound = match backend.load_first_block_number()? {
@@ -475,6 +544,7 @@ async fn handle_active_leaf(
 			header.parent_hash,
 			reversion_logs,
 			weight,
+			stagnant_at,
 		)?;
 	}
 
@@ -556,6 +626,22 @@ fn handle_approved_block(
 	backend.write(ops)
 }
 
+fn detect_stagnant(
+	backend: &mut impl Backend,
+	now: Timestamp,
+) -> Result<(), Error> {
+	let ops = {
+		let overlay = crate::tree::detect_stagnant(
+			&*backend,
+			now,
+		)?;
+
+		overlay.into_write_ops()
+	};
+
+	backend.write(ops)
+}
+
 // Load the leaves from the backend. If there are no leaves, then return
 // the finalized block.
 async fn load_leaves(
diff --git a/polkadot/node/core/chain-selection/src/tests.rs b/polkadot/node/core/chain-selection/src/tests.rs
index 945578a47e6..1449fae3f5d 100644
--- a/polkadot/node/core/chain-selection/src/tests.rs
+++ b/polkadot/node/core/chain-selection/src/tests.rs
@@ -22,7 +22,7 @@
 
 use super::*;
 use std::collections::{HashMap, HashSet, BTreeMap};
-use std::sync::Arc;
+use std::sync::{atomic::{Ordering as AtomicOrdering, AtomicU64}, Arc};
 
 use futures::channel::oneshot;
 use parity_scale_codec::Encode;
@@ -103,6 +103,21 @@ impl TestBackend {
 			}
 		}
 	}
+
+	fn assert_stagnant_at_state(
+		&self,
+		stagnant_at: Vec<(Timestamp, Vec<Hash>)>,
+	) {
+		let inner = self.inner.lock();
+		assert_eq!(inner.stagnant_at.len(), stagnant_at.len());
+		for (at, hashes) in stagnant_at {
+			let stored_hashes = inner.stagnant_at.get(&at).unwrap();
+			assert_eq!(hashes.len(), stored_hashes.len());
+			for hash in hashes {
+				assert!(stored_hashes.contains(&hash));
+			}
+		}
+	}
 }
 
 impl Default for TestBackend {
@@ -173,18 +188,45 @@ impl Backend for TestBackend {
 	}
 }
 
+#[derive(Clone)]
+pub struct TestClock(Arc<AtomicU64>);
+
+impl TestClock {
+	fn new(initial: u64) -> Self {
+		TestClock(Arc::new(AtomicU64::new(initial)))
+	}
+
+	fn inc_by(&self, duration: u64) {
+		self.0.fetch_add(duration, AtomicOrdering::Relaxed);
+	}
+}
+
+impl Clock for TestClock {
+	fn timestamp_now(&self) -> Timestamp {
+		self.0.load(AtomicOrdering::Relaxed)
+	}
+}
+
+const TEST_STAGNANT_INTERVAL: Duration = Duration::from_millis(20);
+
 type VirtualOverseer = test_helpers::TestSubsystemContextHandle<ChainSelectionMessage>;
 
 fn test_harness<T: Future<Output=VirtualOverseer>>(
-	test: impl FnOnce(TestBackend, VirtualOverseer) -> T
+	test: impl FnOnce(TestBackend, TestClock, VirtualOverseer) -> T
 ) {
 	let pool = TaskExecutor::new();
 	let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool);
 
 	let backend = TestBackend::default();
-	let subsystem = crate::run(context, backend.clone());
+	let clock = TestClock::new(0);
+	let subsystem = crate::run(
+		context,
+		backend.clone(),
+		StagnantCheckInterval::new(TEST_STAGNANT_INTERVAL),
+		Box::new(clock.clone()),
+	);
 
-	let test_fut = test(backend, virtual_overseer);
+	let test_fut = test(backend, clock, virtual_overseer);
 	let test_and_conclude = async move {
 		let mut virtual_overseer = test_fut.await;
 		virtual_overseer.send(OverseerSignal::Conclude.into()).await;
@@ -582,12 +624,12 @@ async fn approve_block(
 
 #[test]
 fn no_op_subsystem_run() {
-	test_harness(|_, virtual_overseer| async move { virtual_overseer });
+	test_harness(|_, _, virtual_overseer| async move { virtual_overseer });
 }
 
 #[test]
 fn import_direct_child_of_finalized_on_empty() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -614,7 +656,7 @@ fn import_direct_child_of_finalized_on_empty() {
 
 #[test]
 fn import_chain_on_finalized_incrementally() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -643,7 +685,7 @@ fn import_chain_on_finalized_incrementally() {
 
 #[test]
 fn import_two_subtrees_on_finalized() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -687,7 +729,7 @@ fn import_two_subtrees_on_finalized() {
 
 #[test]
 fn import_two_subtrees_on_nonzero_finalized() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 100;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -731,7 +773,7 @@ fn import_two_subtrees_on_nonzero_finalized() {
 
 #[test]
 fn leaves_ordered_by_weight_and_then_number() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -784,7 +826,7 @@ fn leaves_ordered_by_weight_and_then_number() {
 
 #[test]
 fn subtrees_imported_even_with_gaps() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -835,7 +877,7 @@ fn subtrees_imported_even_with_gaps() {
 
 #[test]
 fn reversion_removes_viability_of_chain() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -871,7 +913,7 @@ fn reversion_removes_viability_of_chain() {
 
 #[test]
 fn reversion_removes_viability_and_finds_ancestor_as_leaf() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -905,7 +947,7 @@ fn reversion_removes_viability_and_finds_ancestor_as_leaf() {
 
 #[test]
 fn ancestor_of_unviable_is_not_leaf_if_has_children() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -974,7 +1016,7 @@ fn ancestor_of_unviable_is_not_leaf_if_has_children() {
 
 #[test]
 fn self_and_future_reversions_are_ignored() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1006,7 +1048,7 @@ fn self_and_future_reversions_are_ignored() {
 
 #[test]
 fn revert_finalized_is_ignored() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 10;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1038,7 +1080,7 @@ fn revert_finalized_is_ignored() {
 
 #[test]
 fn reversion_affects_viability_of_all_subtrees() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1096,7 +1138,7 @@ fn reversion_affects_viability_of_all_subtrees() {
 
 #[test]
 fn finalize_viable_prunes_subtrees() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1204,7 +1246,7 @@ fn finalize_viable_prunes_subtrees() {
 
 #[test]
 fn finalization_does_not_clobber_unviability() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1258,7 +1300,7 @@ fn finalization_does_not_clobber_unviability() {
 
 #[test]
 fn finalization_erases_unviable() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1322,7 +1364,7 @@ fn finalization_erases_unviable() {
 
 #[test]
 fn finalize_erases_unviable_but_keeps_later_unviability() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1390,7 +1432,7 @@ fn finalize_erases_unviable_but_keeps_later_unviability() {
 
 #[test]
 fn finalize_erases_unviable_from_one_but_not_all_reverts() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1450,7 +1492,7 @@ fn finalize_erases_unviable_from_one_but_not_all_reverts() {
 
 #[test]
 fn finalize_triggers_viability_search() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1522,7 +1564,7 @@ fn finalize_triggers_viability_search() {
 
 #[test]
 fn best_leaf_none_with_empty_db() {
-	test_harness(|_backend, mut virtual_overseer| async move {
+	test_harness(|_backend, _,  mut virtual_overseer| async move {
 		let required = Hash::repeat_byte(1);
 		let best_leaf = best_leaf_containing(&mut virtual_overseer, required).await;
 		assert!(best_leaf.is_none());
@@ -1533,7 +1575,7 @@ fn best_leaf_none_with_empty_db() {
 
 #[test]
 fn best_leaf_none_with_no_viable_leaves() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1575,7 +1617,7 @@ fn best_leaf_none_with_no_viable_leaves() {
 
 #[test]
 fn best_leaf_none_with_unknown_required() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1609,7 +1651,7 @@ fn best_leaf_none_with_unknown_required() {
 
 #[test]
 fn best_leaf_none_with_unviable_required() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1661,7 +1703,7 @@ fn best_leaf_none_with_unviable_required() {
 
 #[test]
 fn best_leaf_with_finalized_required() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1705,7 +1747,7 @@ fn best_leaf_with_finalized_required() {
 
 #[test]
 fn best_leaf_with_unfinalized_required() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1751,7 +1793,7 @@ fn best_leaf_with_unfinalized_required() {
 
 #[test]
 fn best_leaf_ancestor_of_all_leaves() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1809,7 +1851,7 @@ fn best_leaf_ancestor_of_all_leaves() {
 
 #[test]
 fn approve_message_approves_block_entry() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1859,7 +1901,7 @@ fn approve_message_approves_block_entry() {
 
 #[test]
 fn approve_nonexistent_has_no_effect() {
-	test_harness(|backend, mut virtual_overseer| async move {
+	test_harness(|backend, _, mut virtual_overseer| async move {
 		let finalized_number = 0;
 		let finalized_hash = Hash::repeat_byte(0);
 
@@ -1907,3 +1949,342 @@ fn approve_nonexistent_has_no_effect() {
 		virtual_overseer
 	})
 }
+
+#[test]
+fn block_has_correct_stagnant_at() {
+	test_harness(|backend, clock, mut virtual_overseer| async move {
+		let finalized_number = 0;
+		let finalized_hash = Hash::repeat_byte(0);
+
+		// F <- A1 <- A2
+
+		let (a1_hash, chain_a) = construct_chain_on_base(
+			vec![1],
+			finalized_number,
+			finalized_hash,
+			|h| {
+				salt_header(h, b"a");
+			}
+		);
+
+		let (a2_hash, chain_a_ext) = construct_chain_on_base(
+			vec![1],
+			1,
+			a1_hash,
+			|h| {
+				salt_header(h, b"a");
+			}
+		);
+
+		import_chains_into_empty(
+			&mut virtual_overseer,
+			&backend,
+			finalized_number,
+			finalized_hash,
+			vec![chain_a.clone()],
+		).await;
+
+		clock.inc_by(1);
+
+		import_blocks_into(
+			&mut virtual_overseer,
+			&backend,
+			None,
+			chain_a_ext.clone(),
+		).await;
+
+		backend.assert_stagnant_at_state(vec![
+			(STAGNANT_TIMEOUT, vec![a1_hash]),
+			(STAGNANT_TIMEOUT + 1, vec![a2_hash]),
+		]);
+
+		virtual_overseer
+	})
+}
+
+#[test]
+fn detects_stagnant() {
+	test_harness(|backend, clock, mut virtual_overseer| async move {
+		let finalized_number = 0;
+		let finalized_hash = Hash::repeat_byte(0);
+
+		// F <- A1
+
+		let (a1_hash, chain_a) = construct_chain_on_base(
+			vec![1],
+			finalized_number,
+			finalized_hash,
+			|h| {
+				salt_header(h, b"a");
+			}
+		);
+
+		import_chains_into_empty(
+			&mut virtual_overseer,
+			&backend,
+			finalized_number,
+			finalized_hash,
+			vec![chain_a.clone()],
+		).await;
+
+		{
+			let (_, write_rx) = backend.await_next_write();
+			clock.inc_by(STAGNANT_TIMEOUT);
+
+			write_rx.await.unwrap();
+		}
+
+		backend.assert_stagnant_at_state(vec![]);
+
+		assert_matches!(
+			backend.load_block_entry(&a1_hash).unwrap().unwrap().viability.approval,
+			Approval::Stagnant
+		);
+
+		assert_leaves(&backend, vec![]);
+
+		virtual_overseer
+	})
+}
+
+#[test]
+fn finalize_stagnant_unlocks_subtree() {
+	test_harness(|backend, clock, mut virtual_overseer| async move {
+		let finalized_number = 0;
+		let finalized_hash = Hash::repeat_byte(0);
+
+		// F <- A1 <- A2
+
+		let (a1_hash, chain_a) = construct_chain_on_base(
+			vec![1],
+			finalized_number,
+			finalized_hash,
+			|h| {
+				salt_header(h, b"a");
+			}
+		);
+
+		let (a2_hash, chain_a_ext) = construct_chain_on_base(
+			vec![1],
+			1,
+			a1_hash,
+			|h| {
+				salt_header(h, b"a");
+			}
+		);
+
+		import_chains_into_empty(
+			&mut virtual_overseer,
+			&backend,
+			finalized_number,
+			finalized_hash,
+			vec![chain_a.clone()],
+		).await;
+
+		clock.inc_by(1);
+
+		import_blocks_into(
+			&mut virtual_overseer,
+			&backend,
+			None,
+			chain_a_ext.clone(),
+		).await;
+
+		{
+			let (_, write_rx) = backend.await_next_write();
+			clock.inc_by(STAGNANT_TIMEOUT - 1);
+
+			write_rx.await.unwrap();
+		}
+
+		backend.assert_stagnant_at_state(vec![(STAGNANT_TIMEOUT + 1, vec![a2_hash])]);
+
+		assert_matches!(
+			backend.load_block_entry(&a1_hash).unwrap().unwrap().viability.approval,
+			Approval::Stagnant
+		);
+
+		assert_leaves(&backend, vec![]);
+
+		finalize_block(
+			&mut virtual_overseer,
+			&backend,
+			1,
+			a1_hash,
+		).await;
+
+		assert_leaves(&backend, vec![a2_hash]);
+
+		virtual_overseer
+	})
+}
+
+#[test]
+fn approval_undoes_stagnant_unlocking_subtree() {
+	test_harness(|backend, clock, mut virtual_overseer| async move {
+		let finalized_number = 0;
+		let finalized_hash = Hash::repeat_byte(0);
+
+		// F <- A1 <- A2
+
+		let (a1_hash, chain_a) = construct_chain_on_base(
+			vec![1],
+			finalized_number,
+			finalized_hash,
+			|h| {
+				salt_header(h, b"a");
+			}
+		);
+
+		let (a2_hash, chain_a_ext) = construct_chain_on_base(
+			vec![1],
+			1,
+			a1_hash,
+			|h| {
+				salt_header(h, b"a");
+			}
+		);
+
+		import_chains_into_empty(
+			&mut virtual_overseer,
+			&backend,
+			finalized_number,
+			finalized_hash,
+			vec![chain_a.clone()],
+		).await;
+
+		clock.inc_by(1);
+
+		import_blocks_into(
+			&mut virtual_overseer,
+			&backend,
+			None,
+			chain_a_ext.clone(),
+		).await;
+
+		{
+			let (_, write_rx) = backend.await_next_write();
+			clock.inc_by(STAGNANT_TIMEOUT - 1);
+
+			write_rx.await.unwrap();
+		}
+
+		backend.assert_stagnant_at_state(vec![(STAGNANT_TIMEOUT + 1, vec![a2_hash])]);
+
+		approve_block(
+			&mut virtual_overseer,
+			&backend,
+			a1_hash,
+		).await;
+
+		assert_matches!(
+			backend.load_block_entry(&a1_hash).unwrap().unwrap().viability.approval,
+			Approval::Approved
+		);
+
+		assert_leaves(&backend, vec![a2_hash]);
+
+		virtual_overseer
+	})
+}
+
+#[test]
+fn stagnant_preserves_parents_children() {
+	test_harness(|backend, clock, mut virtual_overseer| async move {
+		let finalized_number = 0;
+		let finalized_hash = Hash::repeat_byte(0);
+
+		// F <- A1 <- A2
+		//      A1 <- B2
+
+		let (a2_hash, chain_a) = construct_chain_on_base(
+			vec![1, 2],
+			finalized_number,
+			finalized_hash,
+			|h| {
+				salt_header(h, b"a");
+			}
+		);
+
+		let (_, a1_hash, _) = extract_info_from_chain(0, &chain_a);
+
+		let (b2_hash, chain_b) = construct_chain_on_base(
+			vec![1],
+			1,
+			a1_hash,
+			|h| {
+				salt_header(h, b"b");
+			}
+		);
+
+		import_chains_into_empty(
+			&mut virtual_overseer,
+			&backend,
+			finalized_number,
+			finalized_hash,
+			vec![chain_a.clone(), chain_b.clone()],
+		).await;
+
+		approve_block(&mut virtual_overseer, &backend, a1_hash).await;
+		approve_block(&mut virtual_overseer, &backend, b2_hash).await;
+
+		assert_leaves(&backend, vec![a2_hash, b2_hash]);
+
+		{
+			let (_, write_rx) = backend.await_next_write();
+			clock.inc_by(STAGNANT_TIMEOUT);
+
+			write_rx.await.unwrap();
+		}
+
+		backend.assert_stagnant_at_state(vec![]);
+		assert_leaves(&backend, vec![b2_hash]);
+
+		virtual_overseer
+	})
+}
+
+#[test]
+fn stagnant_makes_childless_parent_leaf() {
+	test_harness(|backend, clock, mut virtual_overseer| async move {
+		let finalized_number = 0;
+		let finalized_hash = Hash::repeat_byte(0);
+
+		// F <- A1 <- A2
+
+		let (a2_hash, chain_a) = construct_chain_on_base(
+			vec![1, 2],
+			finalized_number,
+			finalized_hash,
+			|h| {
+				salt_header(h, b"a");
+			}
+		);
+
+		let (_, a1_hash, _) = extract_info_from_chain(0, &chain_a);
+
+		import_chains_into_empty(
+			&mut virtual_overseer,
+			&backend,
+			finalized_number,
+			finalized_hash,
+			vec![chain_a.clone()],
+		).await;
+
+		approve_block(&mut virtual_overseer, &backend, a1_hash).await;
+
+		assert_leaves(&backend, vec![a2_hash]);
+
+		{
+			let (_, write_rx) = backend.await_next_write();
+			clock.inc_by(STAGNANT_TIMEOUT);
+
+			write_rx.await.unwrap();
+		}
+
+		backend.assert_stagnant_at_state(vec![]);
+		assert_leaves(&backend, vec![a1_hash]);
+
+		virtual_overseer
+	})
+}
diff --git a/polkadot/node/core/chain-selection/src/tree.rs b/polkadot/node/core/chain-selection/src/tree.rs
index a10f0d0c5ad..ff3db9e9e4a 100644
--- a/polkadot/node/core/chain-selection/src/tree.rs
+++ b/polkadot/node/core/chain-selection/src/tree.rs
@@ -251,8 +251,9 @@ pub(crate) fn import_block(
 	parent_hash: Hash,
 	reversion_logs: Vec<BlockNumber>,
 	weight: BlockWeight,
+	stagnant_at: Timestamp,
 ) -> Result<(), Error> {
-	add_block(backend, block_hash, block_number, parent_hash, weight)?;
+	add_block(backend, block_hash, block_number, parent_hash, weight, stagnant_at)?;
 	apply_reversions(
 		backend,
 		block_hash,
@@ -308,6 +309,7 @@ fn add_block(
 	block_number: BlockNumber,
 	parent_hash: Hash,
 	weight: BlockWeight,
+	stagnant_at: Timestamp,
 ) -> Result<(), Error> {
 	let mut leaves = backend.load_leaves()?;
 	let parent_entry = backend.load_block_entry(&parent_hash)?;
@@ -350,7 +352,6 @@ fn add_block(
 	backend.write_blocks_by_number(block_number, blocks_by_number);
 
 	// 5. Add stagnation timeout.
-	let stagnant_at = crate::stagnant_timeout_from_now();
 	let mut stagnant_at_list = backend.load_stagnant_at(stagnant_at)?;
 	stagnant_at_list.push(block_hash);
 	backend.write_stagnant_at(stagnant_at, stagnant_at_list);
@@ -549,8 +550,6 @@ pub(super) fn approve_block(
 ///
 /// This accepts a fresh backend and returns an overlay on top of it representing
 /// all changes made.
-// TODO https://github.com/paritytech/polkadot/issues/3293:: remove allow
-#[allow(unused)]
 pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
 	backend: &'a B,
 	up_to: Timestamp,
-- 
GitLab