From 5bc2b2779d430bcac2f0e00af7135c2e943eece5 Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Sun, 13 Jun 2021 13:35:18 +0200
Subject: [PATCH] Dispute Coordinator Subsystem (#3150)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* skeleton for dispute-coordinator

* add coordinator and participation message types

* begin dispute-coordinator DB

* functions for loading

* implement strongly-typed DB transaction

* add some tests for DB transaction

* core logic for pruning

* guide: update candidate-votes key for coordinator

* update candidate-votes key

* use big-endian encoding for session, and implement upper bound generator

* finish implementing pruning

* add a test for note_current_session

* define state of the subsystem itself

* barebones subsystem definition

* control flow

* more control flow

* implement session-updating logic

* trace

* control flow for message handling

* Update node/core/dispute-coordinator/src/lib.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update node/subsystem/src/messages.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* some more control flow

* guide: remove overlay

* more control flow

* implement some DB getters

* make progress on importing statements

* add SignedDisputeStatement struct

* move ApprovalVote to shared primitives

* add a signing-payload API to explicit dispute statements

* add signing-payload to CompactStatement

* add relay-parent hash to seconded/valid dispute variatns

* correct import

* type-safe wrapper around dispute statements

* use checked dispute statement in message type

* extract rolling session window cache to subsystem-util

* extract session window tests

* approval-voting: use rolling session info cache

* reduce dispute window to match runtime in practice

* add byzantine_threshold and supermajority_threshold utilities to primitives

* integrate rolling session window

* Add PartialOrd to CandidateHash

* add Ord to CandidateHash

* implement active dispute update

* add dispute messages to AllMessages

* add dispute stubs to overseer

* inform dispute participation to participate

* implement issue_local_statement

* implement `determine_undisputed_chain`

* fix warnings

* test harness for dispute coordinator tests

* add more helpers to test harness

* add some more helpers

* some tests for dispute coordinator

* ignore wrong validator indices

* test finality voting rule constraint

* add more tests

* add variants to network bridge

* fix test compilation

* remove most dispute coordinator functionality

as of #3222 we can do most of the work within the approval voting subsystem

* Revert "remove most dispute coordinator functionality"

This reverts commit 9cd615e8eb6ca0b382cbaff525d813e753d6004e.

* Use thiserror

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>

* Update node/core/dispute-coordinator/src/lib.rs

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>

* extract tests to separate module

* address nit

* adjust run_iteration API

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
---
 polkadot/Cargo.lock                           |  25 +
 polkadot/Cargo.toml                           |   1 +
 polkadot/core-primitives/src/lib.rs           |   2 +-
 .../node/core/approval-voting/src/import.rs   | 583 ++-------------
 polkadot/node/core/approval-voting/src/lib.rs |  31 +-
 .../node/core/approval-voting/src/tests.rs    |  13 +-
 .../node/core/dispute-coordinator/Cargo.toml  |  29 +
 .../core/dispute-coordinator/src/db/mod.rs    |  19 +
 .../core/dispute-coordinator/src/db/v1.rs     | 585 +++++++++++++++
 .../node/core/dispute-coordinator/src/lib.rs  | 649 ++++++++++++++++
 .../core/dispute-coordinator/src/tests.rs     | 706 ++++++++++++++++++
 polkadot/node/network/bridge/src/tests.rs     |  12 +-
 polkadot/node/overseer/src/lib.rs             |   6 +
 polkadot/node/primitives/Cargo.toml           |   1 +
 polkadot/node/primitives/src/approval.rs      |   4 -
 polkadot/node/primitives/src/lib.rs           | 134 +++-
 polkadot/node/subsystem-util/src/lib.rs       |   2 +
 .../src/rolling_session_window.rs             | 635 ++++++++++++++++
 polkadot/node/subsystem/src/messages.rs       |  77 +-
 polkadot/primitives/src/v0.rs                 |   8 +
 polkadot/primitives/src/v1/mod.rs             | 122 ++-
 .../src/node/disputes/dispute-coordinator.md  |  20 +-
 22 files changed, 3070 insertions(+), 594 deletions(-)
 create mode 100644 polkadot/node/core/dispute-coordinator/Cargo.toml
 create mode 100644 polkadot/node/core/dispute-coordinator/src/db/mod.rs
 create mode 100644 polkadot/node/core/dispute-coordinator/src/db/v1.rs
 create mode 100644 polkadot/node/core/dispute-coordinator/src/lib.rs
 create mode 100644 polkadot/node/core/dispute-coordinator/src/tests.rs
 create mode 100644 polkadot/node/subsystem-util/src/rolling_session_window.rs

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 3e121adec46..96ba0ee11c8 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -6091,6 +6091,30 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "polkadot-node-core-dispute-coordinator"
+version = "0.1.0"
+dependencies = [
+ "assert_matches",
+ "bitvec",
+ "derive_more",
+ "futures 0.3.15",
+ "kvdb",
+ "kvdb-memorydb",
+ "parity-scale-codec",
+ "polkadot-node-primitives",
+ "polkadot-node-subsystem",
+ "polkadot-node-subsystem-test-helpers",
+ "polkadot-node-subsystem-util",
+ "polkadot-primitives",
+ "sc-keystore",
+ "sp-core",
+ "sp-keyring",
+ "sp-keystore",
+ "thiserror",
+ "tracing",
+]
+
 [[package]]
 name = "polkadot-node-core-parachains-inherent"
 version = "0.1.0"
@@ -6221,6 +6245,7 @@ dependencies = [
  "sp-consensus-babe",
  "sp-consensus-vrf",
  "sp-core",
+ "sp-keystore",
  "sp-maybe-compressed-blob",
  "sp-runtime",
  "thiserror",
diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml
index db209d4c857..dd4983b7535 100644
--- a/polkadot/Cargo.toml
+++ b/polkadot/Cargo.toml
@@ -48,6 +48,7 @@ members = [
 	"node/core/bitfield-signing",
 	"node/core/candidate-validation",
 	"node/core/chain-api",
+	"node/core/dispute-coordinator",
 	"node/core/parachains-inherent",
 	"node/core/provisioner",
 	"node/core/pvf",
diff --git a/polkadot/core-primitives/src/lib.rs b/polkadot/core-primitives/src/lib.rs
index 0b2921f9c12..c552929d15f 100644
--- a/polkadot/core-primitives/src/lib.rs
+++ b/polkadot/core-primitives/src/lib.rs
@@ -60,7 +60,7 @@ pub type Hash = sp_core::H256;
 /// This type is produced by [`CandidateReceipt::hash`].
 ///
 /// This type makes it easy to enforce that a hash is a candidate hash on the type level.
-#[derive(Clone, Copy, Encode, Decode, Hash, Eq, PartialEq, Default)]
+#[derive(Clone, Copy, Encode, Decode, Hash, Eq, PartialEq, Default, PartialOrd, Ord)]
 #[cfg_attr(feature = "std", derive(MallocSizeOf))]
 pub struct CandidateHash(pub Hash);
 
diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs
index c783f49f26c..74a966171ec 100644
--- a/polkadot/node/core/approval-voting/src/import.rs
+++ b/polkadot/node/core/approval-voting/src/import.rs
@@ -34,8 +34,11 @@ use polkadot_node_subsystem::{
 	},
 	SubsystemContext, SubsystemError, SubsystemResult,
 };
+use polkadot_node_subsystem_util::rolling_session_window::{
+	RollingSessionWindow, SessionWindowUpdate,
+};
 use polkadot_primitives::v1::{
-	Hash, SessionIndex, SessionInfo, CandidateEvent, Header, CandidateHash,
+	Hash, SessionIndex, CandidateEvent, Header, CandidateHash,
 	CandidateReceipt, CoreIndex, GroupIndex, BlockNumber, ConsensusLog,
 };
 use polkadot_node_primitives::approval::{
@@ -58,32 +61,7 @@ use crate::persisted_entries::CandidateEntry;
 use crate::criteria::{AssignmentCriteria, OurAssignment};
 use crate::time::{slot_number_to_tick, Tick};
 
-use super::{APPROVAL_SESSIONS, LOG_TARGET, State, DBReader};
-
-/// A rolling window of sessions.
-#[derive(Default)]
-pub struct RollingSessionWindow {
-	pub earliest_session: Option<SessionIndex>,
-	pub session_info: Vec<SessionInfo>,
-}
-
-impl RollingSessionWindow {
-	pub fn session_info(&self, index: SessionIndex) -> Option<&SessionInfo> {
-		self.earliest_session.and_then(|earliest| {
-			if index < earliest {
-				None
-			} else {
-				self.session_info.get((index - earliest) as usize)
-			}
-		})
-
-	}
-
-	pub fn latest_session(&self) -> Option<SessionIndex> {
-		self.earliest_session
-			.map(|earliest| earliest + (self.session_info.len() as SessionIndex).saturating_sub(1))
-	}
-}
+use super::{LOG_TARGET, State, DBReader};
 
 // Given a new chain-head hash, this determines the hashes of all new blocks we should track
 // metadata for, given this head. The list will typically include the `head` hash provided unless
@@ -191,153 +169,6 @@ async fn determine_new_blocks(
 	Ok(ancestry)
 }
 
-// Sessions unavailable in state to cache.
-#[derive(Debug)]
-struct SessionsUnavailable;
-
-async fn load_all_sessions(
-	ctx: &mut impl SubsystemContext,
-	block_hash: Hash,
-	start: SessionIndex,
-	end_inclusive: SessionIndex,
-) -> Result<Vec<SessionInfo>, SessionsUnavailable> {
-	let mut v = Vec::new();
-	for i in start..=end_inclusive {
-		let (tx, rx)= oneshot::channel();
-		ctx.send_message(RuntimeApiMessage::Request(
-			block_hash,
-			RuntimeApiRequest::SessionInfo(i, tx),
-		).into()).await;
-
-		let session_info = match rx.await {
-			Ok(Ok(Some(s))) => s,
-			Ok(Ok(None)) => {
-				tracing::debug!(
-					target: LOG_TARGET,
-					"Session {} is missing from session-info state of block {}",
-					i,
-					block_hash,
-				);
-
-				return Err(SessionsUnavailable);
-			}
-			Ok(Err(_)) | Err(_) => return Err(SessionsUnavailable),
-		};
-
-		v.push(session_info);
-	}
-
-	Ok(v)
-}
-
-// When inspecting a new import notification, updates the session info cache to match
-// the session of the imported block.
-//
-// this only needs to be called on heads where we are directly notified about import, as sessions do
-// not change often and import notifications are expected to be typically increasing in session number.
-//
-// some backwards drift in session index is acceptable.
-async fn cache_session_info_for_head(
-	ctx: &mut impl SubsystemContext,
-	session_window: &mut RollingSessionWindow,
-	block_hash: Hash,
-	block_header: &Header,
-) -> Result<(), SessionsUnavailable> {
-	let session_index = {
-		let (s_tx, s_rx) = oneshot::channel();
-
-		// The genesis is guaranteed to be at the beginning of the session and its parent state
-		// is non-existent. Therefore if we're at the genesis, we request using its state and
-		// not the parent.
-		ctx.send_message(RuntimeApiMessage::Request(
-			if block_header.number == 0 { block_hash } else { block_header.parent_hash },
-			RuntimeApiRequest::SessionIndexForChild(s_tx),
-		).into()).await;
-
-		match s_rx.await {
-			Ok(Ok(s)) => s,
-			Ok(Err(_)) | Err(_) => return Err(SessionsUnavailable),
-		}
-	};
-
-	match session_window.earliest_session {
-		None => {
-			// First block processed on start-up.
-
-			let window_start = session_index.saturating_sub(APPROVAL_SESSIONS - 1);
-
-			tracing::debug!(
-				target: LOG_TARGET, "Loading approval window from session {}..={}",
-				window_start, session_index,
-			);
-
-			match load_all_sessions(ctx, block_hash, window_start, session_index).await {
-				Err(SessionsUnavailable) => {
-					tracing::debug!(
-						target: LOG_TARGET,
-						"Could not load sessions {}..={} from block {:?} in session {}",
-						window_start, session_index, block_hash, session_index,
-					);
-
-					return Err(SessionsUnavailable);
-				},
-				Ok(s) => {
-					session_window.earliest_session = Some(window_start);
-					session_window.session_info = s;
-				}
-			}
-		}
-		Some(old_window_start) => {
-			let latest = session_window.latest_session().expect("latest always exists if earliest does; qed");
-
-			// Either cached or ancient.
-			if session_index <= latest { return Ok(()) }
-
-			let old_window_end = latest;
-
-			let window_start = session_index.saturating_sub(APPROVAL_SESSIONS - 1);
-			tracing::info!(
-				target: LOG_TARGET, "Moving approval window from session {}..={} to {}..={}",
-				old_window_start, old_window_end,
-				window_start, session_index,
-			);
-
-			// keep some of the old window, if applicable.
-			let overlap_start = window_start.saturating_sub(old_window_start);
-
-			let fresh_start = if latest < window_start {
-				window_start
-			} else {
-				latest + 1
-			};
-
-			match load_all_sessions(ctx, block_hash, fresh_start, session_index).await {
-				Err(SessionsUnavailable) => {
-					tracing::warn!(
-						target: LOG_TARGET,
-						"Could not load sessions {}..={} from block {:?} in session {}",
-						latest + 1, session_index, block_hash, session_index,
-					);
-
-					return Err(SessionsUnavailable);
-				}
-				Ok(s) => {
-					let outdated = std::cmp::min(overlap_start as usize, session_window.session_info.len());
-					session_window.session_info.drain(..outdated);
-					session_window.session_info.extend(s);
-					// we need to account for this case:
-					// window_start ................................... session_index
-					//              old_window_start ........... latest
-					let new_earliest = std::cmp::max(window_start, old_window_start);
-					session_window.earliest_session = Some(new_earliest);
-				}
-			}
-		}
-	}
-
-	Ok(())
-}
-
 struct ImportedBlockInfo {
 	included_candidates: Vec<(CandidateHash, CandidateReceipt, CoreIndex, GroupIndex)>,
 	session_index: SessionIndex,
@@ -401,7 +232,7 @@ async fn imported_block_info(
 			Err(_) => return Ok(None),
 		};
 
-		if env.session_window.earliest_session.as_ref().map_or(true, |e| &session_index < e) {
+		if env.session_window.earliest_session().map_or(true, |e| session_index < e) {
 			tracing::debug!(target: LOG_TARGET, "Block {} is from ancient session {}. Skipping",
 				block_hash, session_index);
 
@@ -591,21 +422,25 @@ pub(crate) async fn handle_new_head(
 		}
 	};
 
-	if let Err(SessionsUnavailable)
-		= cache_session_info_for_head(
-			ctx,
-			&mut state.session_window,
-			head,
-			&header,
-		).await
-	{
-		tracing::debug!(
-			target: LOG_TARGET,
-			"Could not cache session info when processing head {:?}",
-			head,
-		);
+	match state.session_window.cache_session_info_for_head(ctx, head, &header).await {
+		Err(e) => {
+			tracing::warn!(
+				target: LOG_TARGET,
+				?head,
+				?e,
+				"Could not cache session info when processing head.",
+			);
 
-		return Ok(Vec::new())
+			return Ok(Vec::new())
+		}
+		Ok(a @ SessionWindowUpdate::Advanced { .. }) => {
+			tracing::info!(
+				target: LOG_TARGET,
+				update = ?a,
+				"Advanced session window for approvals",
+			);
+		}
+		Ok(_) => {}
 	}
 
 	// If we've just started the node and haven't yet received any finality notifications,
@@ -815,7 +650,7 @@ mod tests {
 	use super::*;
 	use polkadot_node_subsystem_test_helpers::make_subsystem_context;
 	use polkadot_node_primitives::approval::{VRFOutput, VRFProof};
-	use polkadot_primitives::v1::ValidatorIndex;
+	use polkadot_primitives::v1::{SessionInfo, ValidatorIndex};
 	use polkadot_node_subsystem::messages::AllMessages;
 	use sp_core::testing::TaskExecutor;
 	use sp_runtime::{Digest, DigestItem};
@@ -828,7 +663,7 @@ mod tests {
 	use merlin::Transcript;
 	use std::{pin::Pin, sync::Arc};
 
-	use crate::{criteria, BlockEntry};
+	use crate::{APPROVAL_SESSIONS, criteria, BlockEntry};
 
 	const DATA_COL: u32 = 0;
 	const NUM_COLUMNS: u32 = 1;
@@ -884,7 +719,7 @@ mod tests {
 
 	fn blank_state() -> State<TestDB> {
 		State {
-			session_window: RollingSessionWindow::default(),
+			session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
 			keystore: Arc::new(LocalKeystore::in_memory()),
 			slot_duration_millis: 6_000,
 			db: TestDB::default(),
@@ -897,10 +732,11 @@ mod tests {
 		-> State<TestDB>
 	{
 		State {
-			session_window: RollingSessionWindow {
-				earliest_session: Some(index),
-				session_info: vec![info],
-			},
+			session_window: RollingSessionWindow::with_session_info(
+				APPROVAL_SESSIONS,
+				index,
+				vec![info],
+			),
 			..blank_state()
 		}
 	}
@@ -1423,14 +1259,11 @@ mod tests {
 				.map(|(r, c, g)| (r.hash(), r.clone(), *c, *g))
 				.collect::<Vec<_>>();
 
-			let session_window = {
-				let mut window = RollingSessionWindow::default();
-
-				window.earliest_session = Some(session);
-				window.session_info.push(session_info);
-
-				window
-			};
+			let session_window = RollingSessionWindow::with_session_info(
+				APPROVAL_SESSIONS,
+				session,
+				vec![session_info],
+			);
 
 			let header = header.clone();
 			Box::pin(async move {
@@ -1537,14 +1370,11 @@ mod tests {
 			.collect::<Vec<_>>();
 
 		let test_fut = {
-			let session_window = {
-				let mut window = RollingSessionWindow::default();
-
-				window.earliest_session = Some(session);
-				window.session_info.push(session_info);
-
-				window
-			};
+			let session_window = RollingSessionWindow::with_session_info(
+				APPROVAL_SESSIONS,
+				session,
+				vec![session_info],
+			);
 
 			let header = header.clone();
 			Box::pin(async move {
@@ -1645,7 +1475,7 @@ mod tests {
 			.collect::<Vec<_>>();
 
 		let test_fut = {
-			let session_window = RollingSessionWindow::default();
+			let session_window = RollingSessionWindow::new(APPROVAL_SESSIONS);
 
 			let header = header.clone();
 			Box::pin(async move {
@@ -1748,14 +1578,11 @@ mod tests {
 				.map(|(r, c, g)| (r.hash(), r.clone(), *c, *g))
 				.collect::<Vec<_>>();
 
-			let session_window = {
-				let mut window = RollingSessionWindow::default();
-
-				window.earliest_session = Some(session);
-				window.session_info.push(session_info);
-
-				window
-			};
+			let session_window = RollingSessionWindow::with_session_info(
+				APPROVAL_SESSIONS,
+				session,
+				vec![session_info],
+			);
 
 			let header = header.clone();
 			Box::pin(async move {
@@ -2019,318 +1846,4 @@ mod tests {
 
 		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
 	}
-
-	fn cache_session_info_test(
-		expected_start_session: SessionIndex,
-		session: SessionIndex,
-		mut window: RollingSessionWindow,
-		expect_requests_from: SessionIndex,
-	) {
-		let header = Header {
-			digest: Digest::default(),
-			extrinsics_root: Default::default(),
-			number: 5,
-			state_root: Default::default(),
-			parent_hash: Default::default(),
-		};
-
-		let pool = TaskExecutor::new();
-		let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
-
-		let hash = header.hash();
-
-		let test_fut = {
-			let header = header.clone();
-			Box::pin(async move {
-				cache_session_info_for_head(
-					&mut ctx,
-					&mut window,
-					hash,
-					&header,
-				).await.unwrap();
-
-				assert_eq!(window.earliest_session, Some(expected_start_session));
-				assert_eq!(
-					window.session_info,
-					(expected_start_session..=session).map(dummy_session_info).collect::<Vec<_>>(),
-				);
-			})
-		};
-
-		let aux_fut = Box::pin(async move {
-			assert_matches!(
-				handle.recv().await,
-				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					h,
-					RuntimeApiRequest::SessionIndexForChild(s_tx),
-				)) => {
-					assert_eq!(h, header.parent_hash);
-					let _ = s_tx.send(Ok(session));
-				}
-			);
-
-			for i in expect_requests_from..=session {
-				assert_matches!(
-					handle.recv().await,
-					AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-						h,
-						RuntimeApiRequest::SessionInfo(j, s_tx),
-					)) => {
-						assert_eq!(h, hash);
-						assert_eq!(i, j);
-						let _ = s_tx.send(Ok(Some(dummy_session_info(i))));
-					}
-				);
-			}
-		});
-
-		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
-	}
-
-	#[test]
-	fn cache_session_info_first_early() {
-		cache_session_info_test(
-			0,
-			1,
-			RollingSessionWindow::default(),
-			0,
-		);
-	}
-
-	#[test]
-	fn cache_session_info_does_not_underflow() {
-		let window = RollingSessionWindow {
-			earliest_session: Some(1),
-			session_info: vec![dummy_session_info(1)],
-		};
-
-		cache_session_info_test(
-			1,
-			2,
-			window,
-			2,
-		);
-	}
-
-	#[test]
-	fn cache_session_info_first_late() {
-		cache_session_info_test(
-			(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
-			100,
-			RollingSessionWindow::default(),
-			(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
-		);
-	}
-
-	#[test]
-	fn cache_session_info_jump() {
-		let window = RollingSessionWindow {
-			earliest_session: Some(50),
-			session_info: vec![dummy_session_info(50), dummy_session_info(51), dummy_session_info(52)],
-		};
-
-		cache_session_info_test(
-			(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
-			100,
-			window,
-			(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
-		);
-	}
-
-	#[test]
-	fn cache_session_info_roll_full() {
-		let start = 99 - (APPROVAL_SESSIONS - 1);
-		let window = RollingSessionWindow {
-			earliest_session: Some(start),
-			session_info: (start..=99).map(dummy_session_info).collect(),
-		};
-
-		cache_session_info_test(
-			(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
-			100,
-			window,
-			100, // should only make one request.
-		);
-	}
-
-	#[test]
-	fn cache_session_info_roll_many_full() {
-		let start = 97 - (APPROVAL_SESSIONS - 1);
-		let window = RollingSessionWindow {
-			earliest_session: Some(start),
-			session_info: (start..=97).map(dummy_session_info).collect(),
-		};
-
-		cache_session_info_test(
-			(100 as SessionIndex).saturating_sub(APPROVAL_SESSIONS - 1),
-			100,
-			window,
-			98,
-		);
-	}
-
-	#[test]
-	fn cache_session_info_roll_early() {
-		let start = 0;
-		let window = RollingSessionWindow {
-			earliest_session: Some(start),
-			session_info: (0..=1).map(dummy_session_info).collect(),
-		};
-
-		cache_session_info_test(
-			0,
-			2,
-			window,
-			2, // should only make one request.
-		);
-	}
-
-	#[test]
-	fn cache_session_info_roll_many_early() {
-		let start = 0;
-		let window = RollingSessionWindow {
-			earliest_session: Some(start),
-			session_info: (0..=1).map(dummy_session_info).collect(),
-		};
-
-		cache_session_info_test(
-			0,
-			3,
-			window,
-			2,
-		);
-	}
-
-	#[test]
-	fn any_session_unavailable_for_caching_means_no_change() {
-		let session: SessionIndex = 6;
-		let start_session = session.saturating_sub(APPROVAL_SESSIONS - 1);
-
-		let header = Header {
-			digest: Digest::default(),
-			extrinsics_root: Default::default(),
-			number: 5,
-			state_root: Default::default(),
-			parent_hash: Default::default(),
-		};
-
-		let pool = TaskExecutor::new();
-		let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
-
-		let mut window = RollingSessionWindow::default();
-		let hash = header.hash();
-
-		let test_fut = {
-			let header = header.clone();
-			Box::pin(async move {
-				let res = cache_session_info_for_head(
-					&mut ctx,
-					&mut window,
-					hash,
-					&header,
-				).await;
-
-				assert_matches!(res, Err(SessionsUnavailable));
-			})
-		};
-
-		let aux_fut = Box::pin(async move {
-			assert_matches!(
-				handle.recv().await,
-				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					h,
-					RuntimeApiRequest::SessionIndexForChild(s_tx),
-				)) => {
-					assert_eq!(h, header.parent_hash);
-					let _ = s_tx.send(Ok(session));
-				}
-			);
-
-			for i in start_session..=session {
-				assert_matches!(
-					handle.recv().await,
-					AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-						h,
-						RuntimeApiRequest::SessionInfo(j, s_tx),
-					)) => {
-						assert_eq!(h, hash);
-						assert_eq!(i, j);
-
-						let _ = s_tx.send(Ok(if i == session {
-							None
-						} else {
-							Some(dummy_session_info(i))
-						}));
-					}
-				);
-			}
-		});
-
-		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
-	}
-
-	#[test]
-	fn request_session_info_for_genesis() {
-		let session: SessionIndex = 0;
-
-		let header = Header {
-			digest: Digest::default(),
-			extrinsics_root: Default::default(),
-			number: 0,
-			state_root: Default::default(),
-			parent_hash: Default::default(),
-		};
-
-		let pool = TaskExecutor::new();
-		let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
-
-		let mut window = RollingSessionWindow::default();
-		let hash = header.hash();
-
-		let test_fut = {
-			let header = header.clone();
-			Box::pin(async move {
-				cache_session_info_for_head(
-					&mut ctx,
-					&mut window,
-					hash,
-					&header,
-				).await.unwrap();
-
-				assert_eq!(window.earliest_session, Some(session));
-				assert_eq!(
-					window.session_info,
-					vec![dummy_session_info(session)],
-				);
-			})
-		};
-
-		let aux_fut = Box::pin(async move {
-			assert_matches!(
-				handle.recv().await,
-				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					h,
-					RuntimeApiRequest::SessionIndexForChild(s_tx),
-				)) => {
-					assert_eq!(h, hash);
-					let _ = s_tx.send(Ok(session));
-				}
-			);
-
-			assert_matches!(
-				handle.recv().await,
-				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-					h,
-					RuntimeApiRequest::SessionInfo(s, s_tx),
-				)) => {
-					assert_eq!(h, hash);
-					assert_eq!(s, session);
-
-					let _ = s_tx.send(Ok(Some(dummy_session_info(s))));
-				}
-			);
-		});
-
-		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
-	}
 }
diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs
index 926e89696ef..14681f3211b 100644
--- a/polkadot/node/core/approval-voting/src/lib.rs
+++ b/polkadot/node/core/approval-voting/src/lib.rs
@@ -33,20 +33,19 @@ use polkadot_node_subsystem::{
 };
 use polkadot_node_subsystem_util::{
 	metrics::{self, prometheus},
+	rolling_session_window::RollingSessionWindow,
 };
 use polkadot_primitives::v1::{
 	ValidatorIndex, Hash, SessionIndex, SessionInfo, CandidateHash,
 	CandidateReceipt, BlockNumber, PersistedValidationData,
 	ValidationCode, CandidateDescriptor, ValidatorPair, ValidatorSignature, ValidatorId,
-	CandidateIndex, GroupIndex,
+	CandidateIndex, GroupIndex, ApprovalVote,
 };
 use polkadot_node_primitives::{ValidationResult, PoV};
 use polkadot_node_primitives::approval::{
-	IndirectAssignmentCert, IndirectSignedApprovalVote, ApprovalVote, DelayTranche,
-	BlockApprovalMeta,
+	IndirectAssignmentCert, IndirectSignedApprovalVote, DelayTranche, BlockApprovalMeta,
 };
 use polkadot_node_jaeger as jaeger;
-use parity_scale_codec::Encode;
 use sc_keystore::LocalKeystore;
 use sp_consensus::SyncOracle;
 use sp_consensus_slots::Slot;
@@ -497,7 +496,7 @@ struct ApprovalStatus {
 }
 
 struct State<T> {
-	session_window: import::RollingSessionWindow,
+	session_window: RollingSessionWindow,
 	keystore: Arc<LocalKeystore>,
 	slot_duration_millis: u64,
 	db: T,
@@ -591,7 +590,7 @@ async fn run<C>(
 {
 	let (background_tx, background_rx) = mpsc::channel::<BackgroundRequest>(64);
 	let mut state = State {
-		session_window: Default::default(),
+		session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
 		keystore: subsystem.keystore,
 		slot_duration_millis: subsystem.slot_duration_millis,
 		db: ApprovalDBV1Reader::new(subsystem.db.clone(), subsystem.db_config.clone()),
@@ -1226,15 +1225,6 @@ async fn handle_approved_ancestor(
 	Ok(all_approved_max)
 }
 
-fn approval_signing_payload(
-	approval_vote: ApprovalVote,
-	session_index: SessionIndex,
-) -> Vec<u8> {
-	const MAGIC: [u8; 4] = *b"APPR";
-
-	(MAGIC, approval_vote, session_index).encode()
-}
-
 // `Option::cmp` treats `None` as less than `Some`.
 fn min_prefer_some<T: std::cmp::Ord>(
 	a: Option<T>,
@@ -1466,10 +1456,8 @@ fn check_and_import_approval<T>(
 		None => respond_early!(ApprovalCheckResult::Bad)
 	};
 
-	let approval_payload = approval_signing_payload(
-		ApprovalVote(approved_candidate_hash),
-		block_entry.session(),
-	);
+	let approval_payload = ApprovalVote(approved_candidate_hash)
+		.signing_payload(block_entry.session());
 
 	let pubkey = match session_info.validators.get(approval.validator.0 as usize) {
 		Some(k) => k,
@@ -2147,10 +2135,7 @@ fn sign_approval(
 ) -> Option<ValidatorSignature> {
 	let key = keystore.key_pair::<ValidatorPair>(public).ok().flatten()?;
 
-	let payload = approval_signing_payload(
-		ApprovalVote(candidate_hash),
-		session_index,
-	);
+	let payload = ApprovalVote(candidate_hash).signing_payload(session_index);
 
 	Some(key.sign(&payload[..]))
 }
diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs
index 95b76d8a552..1dc20e3962c 100644
--- a/polkadot/node/core/approval-voting/src/tests.rs
+++ b/polkadot/node/core/approval-voting/src/tests.rs
@@ -191,7 +191,7 @@ impl DBReader for TestStore {
 
 fn blank_state() -> State<TestStore> {
 	State {
-		session_window: import::RollingSessionWindow::default(),
+		session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
 		keystore: Arc::new(LocalKeystore::in_memory()),
 		slot_duration_millis: SLOT_DURATION_MILLIS,
 		db: TestStore::default(),
@@ -204,10 +204,11 @@ fn single_session_state(index: SessionIndex, info: SessionInfo)
 	-> State<TestStore>
 {
 	State {
-		session_window: import::RollingSessionWindow {
-			earliest_session: Some(index),
-			session_info: vec![info],
-		},
+		session_window: RollingSessionWindow::with_session_info(
+			APPROVAL_SESSIONS,
+			index,
+			vec![info],
+		),
 		..blank_state()
 	}
 }
@@ -231,7 +232,7 @@ fn sign_approval(
 	candidate_hash: CandidateHash,
 	session_index: SessionIndex,
 ) -> ValidatorSignature {
-	key.sign(&super::approval_signing_payload(ApprovalVote(candidate_hash), session_index)).into()
+	key.sign(&ApprovalVote(candidate_hash).signing_payload(session_index)).into()
 }
 
 #[derive(Clone)]
diff --git a/polkadot/node/core/dispute-coordinator/Cargo.toml b/polkadot/node/core/dispute-coordinator/Cargo.toml
new file mode 100644
index 00000000000..2b798450a33
--- /dev/null
+++ b/polkadot/node/core/dispute-coordinator/Cargo.toml
@@ -0,0 +1,29 @@
+[package]
+name = "polkadot-node-core-dispute-coordinator"
+version = "0.1.0"
+authors = ["Parity Technologies <admin@parity.io>"]
+edition = "2018"
+
+[dependencies]
+bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] }
+futures = "0.3.12"
+tracing = "0.1.26"
+parity-scale-codec = "2"
+kvdb = "0.9.0"
+derive_more = "0.99.1"
+thiserror = "1.0.23"
+
+polkadot-primitives = { path = "../../../primitives" }
+polkadot-node-primitives = { path = "../../primitives" }
+polkadot-node-subsystem = { path = "../../subsystem" }
+polkadot-node-subsystem-util = { path = "../../subsystem-util" }
+
+sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
+[dev-dependencies]
+kvdb-memorydb = "0.9.0"
+polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers"}
+sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
+assert_matches = "1.4.0"
diff --git a/polkadot/node/core/dispute-coordinator/src/db/mod.rs b/polkadot/node/core/dispute-coordinator/src/db/mod.rs
new file mode 100644
index 00000000000..9b79bd5bc74
--- /dev/null
+++ b/polkadot/node/core/dispute-coordinator/src/db/mod.rs
@@ -0,0 +1,19 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Database component for the dispute coordinator.
+
+pub(super) mod v1;
diff --git a/polkadot/node/core/dispute-coordinator/src/db/v1.rs b/polkadot/node/core/dispute-coordinator/src/db/v1.rs
new file mode 100644
index 00000000000..2253b83c619
--- /dev/null
+++ b/polkadot/node/core/dispute-coordinator/src/db/v1.rs
@@ -0,0 +1,585 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! V1 database for the dispute coordinator.
+
+use polkadot_primitives::v1::{
+	CandidateReceipt, ValidDisputeStatementKind, InvalidDisputeStatementKind, ValidatorIndex,
+	ValidatorSignature, SessionIndex, CandidateHash,
+};
+
+use kvdb::{KeyValueDB, DBTransaction};
+use parity_scale_codec::{Encode, Decode};
+
+use crate::DISPUTE_WINDOW;
+
+const ACTIVE_DISPUTES_KEY: &[u8; 15] = b"active-disputes";
+const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session";
+const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes";
+
+fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) -> [u8; 15 + 4 + 32] {
+	let mut buf = [0u8; 15 + 4 + 32];
+	buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY);
+
+	// big-endian encoding is used to ensure lexicographic ordering.
+	buf[15..][..4].copy_from_slice(&session.to_be_bytes());
+	candidate_hash.using_encoded(|s| buf[(15 + 4)..].copy_from_slice(s));
+
+	buf
+}
+
+// Computes the upper lexicographic bound on DB keys for candidate votes with a given
+// upper-exclusive bound on sessions.
+fn candidate_votes_range_upper_bound(upper_exclusive: SessionIndex) -> [u8; 15 + 4] {
+	let mut buf = [0; 15 + 4];
+	buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY);
+	// big-endian encoding is used to ensure lexicographic ordering.
+	buf[15..][..4].copy_from_slice(&upper_exclusive.to_be_bytes());
+
+	buf
+}
+
+fn decode_candidate_votes_key(key: &[u8]) -> Option<(SessionIndex, CandidateHash)> {
+	if key.len() != 15 + 4 + 32 {
+		return None;
+	}
+
+	let mut session_buf = [0; 4];
+	session_buf.copy_from_slice(&key[15..][..4]);
+	let session = SessionIndex::from_be_bytes(session_buf);
+
+	CandidateHash::decode(&mut &key[(15 + 4)..]).ok().map(|hash| (session, hash))
+}
+
+/// Column configuration information for the DB.
+#[derive(Debug, Clone)]
+pub struct ColumnConfiguration {
+	/// The column in the key-value DB where data is stored.
+	pub col_data: u32,
+}
+
+/// Tracked votes on candidates, for the purposes of dispute resolution.
+#[derive(Debug, Clone, Encode, Decode)]
+pub struct CandidateVotes {
+	/// The receipt of the candidate itself.
+	pub candidate_receipt: CandidateReceipt,
+	/// Votes of validity, sorted by validator index.
+	pub valid: Vec<(ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature)>,
+	/// Votes of invalidity, sorted by validator index.
+	pub invalid: Vec<(InvalidDisputeStatementKind, ValidatorIndex, ValidatorSignature)>,
+}
+
+impl From<CandidateVotes> for polkadot_node_primitives::CandidateVotes {
+	fn from(db_votes: CandidateVotes) -> polkadot_node_primitives::CandidateVotes {
+		polkadot_node_primitives::CandidateVotes {
+			candidate_receipt: db_votes.candidate_receipt,
+			valid: db_votes.valid,
+			invalid: db_votes.invalid,
+		}
+	}
+}
+
+impl From<polkadot_node_primitives::CandidateVotes> for CandidateVotes {
+	fn from(primitive_votes: polkadot_node_primitives::CandidateVotes) -> CandidateVotes {
+		CandidateVotes {
+			candidate_receipt: primitive_votes.candidate_receipt,
+			valid: primitive_votes.valid,
+			invalid: primitive_votes.invalid,
+		}
+	}
+}
+
+/// Meta-key for tracking active disputes.
+#[derive(Debug, Default, Clone, Encode, Decode, PartialEq)]
+pub struct ActiveDisputes {
+	/// All disputed candidates, sorted by session index and then by candidate hash.
+	pub disputed: Vec<(SessionIndex, CandidateHash)>,
+}
+
+impl ActiveDisputes {
+	/// Whether the set of active disputes contains the given candidate.
+	pub(crate) fn contains(
+		&self,
+		session: SessionIndex,
+		candidate_hash: CandidateHash,
+	) -> bool {
+		self.disputed.contains(&(session, candidate_hash))
+	}
+
+	/// Insert the session and candidate hash from the set of active disputes.
+	/// Returns 'true' if the entry was not already in the set.
+	pub(crate) fn insert(
+		&mut self,
+		session: SessionIndex,
+		candidate_hash: CandidateHash,
+	) -> bool {
+		let new_entry = (session, candidate_hash);
+
+		let pos = self.disputed.iter()
+			.take_while(|&e| &new_entry < e)
+			.count();
+		if self.disputed.get(pos).map_or(false, |&e| new_entry == e) {
+			false
+		} else {
+			self.disputed.insert(pos, new_entry);
+			true
+		}
+	}
+
+	/// Delete the session and candidate hash from the set of active disputes.
+	/// Returns 'true' if the entry was present.
+	pub(crate) fn delete(
+		&mut self,
+		session: SessionIndex,
+		candidate_hash: CandidateHash,
+	) -> bool {
+		let new_entry = (session, candidate_hash);
+
+		match self.disputed.iter().position(|e| &new_entry == e) {
+			None => false,
+			Some(pos) => {
+				self.disputed.remove(pos);
+				true
+			}
+		}
+	}
+}
+
+/// Errors while accessing things from the DB.
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+	#[error(transparent)]
+	Io(#[from] std::io::Error),
+	#[error(transparent)]
+	Codec(#[from] parity_scale_codec::Error),
+}
+
+/// Result alias for DB errors.
+pub type Result<T> = std::result::Result<T, Error>;
+
+fn load_decode<D: Decode>(db: &dyn KeyValueDB, col_data: u32, key: &[u8])
+	-> Result<Option<D>>
+{
+	match db.get(col_data, key)? {
+		None => Ok(None),
+		Some(raw) => D::decode(&mut &raw[..])
+			.map(Some)
+			.map_err(Into::into),
+	}
+}
+
+/// Load the candidate votes for the identified candidate under the given hash.
+pub(crate) fn load_candidate_votes(
+	db: &dyn KeyValueDB,
+	config: &ColumnConfiguration,
+	session: SessionIndex,
+	candidate_hash: &CandidateHash,
+) -> Result<Option<CandidateVotes>> {
+	load_decode(db, config.col_data, &candidate_votes_key(session, candidate_hash))
+}
+
+/// Load the earliest session, if any.
+pub(crate) fn load_earliest_session(
+	db: &dyn KeyValueDB,
+	config: &ColumnConfiguration,
+) -> Result<Option<SessionIndex>> {
+	load_decode(db, config.col_data, EARLIEST_SESSION_KEY)
+}
+
+/// Load the active disputes, if any.
+pub(crate) fn load_active_disputes(
+	db: &dyn KeyValueDB,
+	config: &ColumnConfiguration,
+) -> Result<Option<ActiveDisputes>> {
+	load_decode(db, config.col_data, ACTIVE_DISPUTES_KEY)
+}
+
+/// An atomic transaction to be commited to the underlying DB.
+#[derive(Debug, Default, Clone)]
+pub(crate) struct Transaction {
+	earliest_session: Option<SessionIndex>,
+	active_disputes: Option<ActiveDisputes>,
+	write_candidate_votes: Vec<(SessionIndex, CandidateHash, CandidateVotes)>,
+	delete_candidate_votes: Vec<(SessionIndex, CandidateHash)>,
+}
+
+impl Transaction {
+	/// Prepare a write to the 'earliest session' field of the DB.
+	///
+	/// Later calls to this function will override earlier ones.
+	pub(crate) fn put_earliest_session(&mut self, session: SessionIndex) {
+		self.earliest_session = Some(session);
+	}
+
+	/// Prepare a write to the active disputes stored in the DB.
+	///
+	/// Later calls to this function will override earlier ones.
+	pub(crate) fn put_active_disputes(&mut self, active: ActiveDisputes) {
+		self.active_disputes = Some(active);
+	}
+
+
+	/// Prepare a write of the candidate votes under the indicated candidate.
+	///
+	/// Later calls to this function for the same candidate will override earlier ones.
+	/// Any calls to this function will be overridden by deletions of the same candidate.
+	pub(crate) fn put_candidate_votes(
+		&mut self,
+		session: SessionIndex,
+		candidate_hash: CandidateHash,
+		votes: CandidateVotes,
+	) {
+		self.write_candidate_votes.push((session, candidate_hash, votes))
+	}
+
+	/// Prepare a deletion of the candidate votes under the indicated candidate.
+	///
+	/// Any calls to this function will override writes to the same candidate.
+	pub(crate) fn delete_candidate_votes(
+		&mut self,
+		session: SessionIndex,
+		candidate_hash: CandidateHash,
+	) {
+		self.delete_candidate_votes.push((session, candidate_hash))
+	}
+
+	/// Write the transaction atomically to the DB.
+	pub(crate) fn write(self, db: &dyn KeyValueDB, config: &ColumnConfiguration) -> Result<()> {
+		let mut tx = DBTransaction::new();
+
+		if let Some(s) = self.earliest_session {
+			tx.put_vec(config.col_data, EARLIEST_SESSION_KEY, s.encode());
+		}
+
+		if let Some(a) = self.active_disputes {
+			tx.put_vec(config.col_data, ACTIVE_DISPUTES_KEY, a.encode());
+		}
+
+		for (session, candidate_hash, votes) in self.write_candidate_votes {
+			tx.put_vec(config.col_data, &candidate_votes_key(session, &candidate_hash), votes.encode());
+		}
+
+		for (session, candidate_hash) in self.delete_candidate_votes {
+			tx.delete(config.col_data, &candidate_votes_key(session, &candidate_hash));
+		}
+
+		db.write(tx).map_err(Into::into)
+	}
+}
+
+/// Maybe prune data in the DB based on the provided session index.
+///
+/// This is intended to be called on every block, and as such will be used to populate the DB on
+/// first launch. If the on-disk data does not need to be pruned, only a single storage read
+/// will be performed.
+///
+/// If one or more ancient sessions are pruned, all metadata on candidates within the ancient
+/// session will be deleted.
+pub(crate) fn note_current_session(
+	store: &dyn KeyValueDB,
+	config: &ColumnConfiguration,
+	current_session: SessionIndex,
+) -> Result<()> {
+	let new_earliest = current_session.saturating_sub(DISPUTE_WINDOW);
+	let mut tx = Transaction::default();
+
+	match load_earliest_session(store, config)? {
+		None => {
+			// First launch - write new-earliest.
+			tx.put_earliest_session(new_earliest);
+		}
+		Some(prev_earliest) if new_earliest > prev_earliest => {
+			// Prune all data in the outdated sessions.
+			tx.put_earliest_session(new_earliest);
+
+			// Clear active disputes metadata.
+			{
+				let mut active_disputes = load_active_disputes(store, config)?.unwrap_or_default();
+				let prune_up_to = active_disputes.disputed.iter()
+					.take_while(|s| s.0 < new_earliest)
+					.count();
+
+				if prune_up_to > 0 {
+					let _ = active_disputes.disputed.drain(..prune_up_to);
+					tx.put_active_disputes(active_disputes);
+				}
+			}
+
+			// Clear all candidate data with session less than the new earliest kept.
+			{
+				let end_prefix = candidate_votes_range_upper_bound(new_earliest);
+
+				store.iter_with_prefix(config.col_data, CANDIDATE_VOTES_SUBKEY)
+					.take_while(|(k, _)| &k[..] < &end_prefix[..])
+					.filter_map(|(k, _)| decode_candidate_votes_key(&k[..]))
+					.for_each(|(session, candidate_hash)| {
+						tx.delete_candidate_votes(session, candidate_hash);
+					});
+			}
+		}
+		Some(_) => {
+			// nothing to do.
+		}
+	};
+
+	tx.write(store, config)
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use polkadot_primitives::v1::{Hash, Id as ParaId};
+
+	#[test]
+	fn candidate_votes_key_works() {
+		let session = 4;
+		let candidate = CandidateHash(Hash::repeat_byte(0x01));
+
+		let key = candidate_votes_key(session, &candidate);
+
+		assert_eq!(&key[0..15], CANDIDATE_VOTES_SUBKEY);
+		assert_eq!(&key[15..19], &[0x00, 0x00, 0x00, 0x04]);
+		assert_eq!(&key[19..51], candidate.0.as_bytes());
+
+		assert_eq!(
+			decode_candidate_votes_key(&key[..]),
+			Some((session, candidate)),
+		);
+	}
+
+	#[test]
+	fn db_transaction() {
+		let store = kvdb_memorydb::create(1);
+		let config = ColumnConfiguration { col_data: 0 };
+
+		{
+			let mut tx = Transaction::default();
+
+			tx.put_earliest_session(0);
+			tx.put_earliest_session(1);
+
+			tx.put_active_disputes(ActiveDisputes {
+				disputed: vec![
+					(0, CandidateHash(Hash::repeat_byte(0))),
+				],
+			});
+
+			tx.put_active_disputes(ActiveDisputes {
+				disputed: vec![
+					(1, CandidateHash(Hash::repeat_byte(1))),
+				],
+			});
+
+			tx.put_candidate_votes(
+				1,
+				CandidateHash(Hash::repeat_byte(1)),
+				CandidateVotes {
+					candidate_receipt: Default::default(),
+					valid: Vec::new(),
+					invalid: Vec::new(),
+				},
+			);
+			tx.put_candidate_votes(
+				1,
+				CandidateHash(Hash::repeat_byte(1)),
+				CandidateVotes {
+					candidate_receipt: {
+						let mut receipt = CandidateReceipt::default();
+						receipt.descriptor.para_id = 5.into();
+
+						receipt
+					},
+					valid: Vec::new(),
+					invalid: Vec::new(),
+				},
+			);
+
+			tx.write(&store, &config).unwrap();
+		}
+
+		// Test that subsequent writes were written.
+		{
+			assert_eq!(
+				load_earliest_session(&store, &config).unwrap().unwrap(),
+				1,
+			);
+
+			assert_eq!(
+				load_active_disputes(&store, &config).unwrap().unwrap(),
+				ActiveDisputes {
+					disputed: vec![
+						(1, CandidateHash(Hash::repeat_byte(1))),
+					],
+				},
+			);
+
+			assert_eq!(
+				load_candidate_votes(
+					&store,
+					&config,
+					1,
+					&CandidateHash(Hash::repeat_byte(1))
+				).unwrap().unwrap().candidate_receipt.descriptor.para_id,
+				ParaId::from(5),
+			);
+		}
+	}
+
+	#[test]
+	fn db_deletes_supersede_writes() {
+		let store = kvdb_memorydb::create(1);
+		let config = ColumnConfiguration { col_data: 0 };
+
+		{
+			let mut tx = Transaction::default();
+			tx.put_candidate_votes(
+				1,
+				CandidateHash(Hash::repeat_byte(1)),
+				CandidateVotes {
+					candidate_receipt: Default::default(),
+					valid: Vec::new(),
+					invalid: Vec::new(),
+				}
+			);
+
+			tx.write(&store, &config).unwrap();
+		}
+
+		assert_eq!(
+			load_candidate_votes(
+				&store,
+				&config,
+				1,
+				&CandidateHash(Hash::repeat_byte(1))
+			).unwrap().unwrap().candidate_receipt.descriptor.para_id,
+			ParaId::from(0),
+		);
+
+		{
+			let mut tx = Transaction::default();
+			tx.put_candidate_votes(
+				1,
+				CandidateHash(Hash::repeat_byte(1)),
+				CandidateVotes {
+					candidate_receipt: {
+						let mut receipt = CandidateReceipt::default();
+						receipt.descriptor.para_id = 5.into();
+
+						receipt
+					},
+					valid: Vec::new(),
+					invalid: Vec::new(),
+				}
+			);
+
+			tx.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1)));
+
+			tx.write(&store, &config).unwrap();
+		}
+
+		assert!(
+			load_candidate_votes(
+				&store,
+				&config,
+				1,
+				&CandidateHash(Hash::repeat_byte(1))
+			).unwrap().is_none()
+		);
+	}
+
+	#[test]
+	fn note_current_session_prunes_old() {
+		let store = kvdb_memorydb::create(1);
+		let config = ColumnConfiguration { col_data: 0 };
+
+		let hash_a = CandidateHash(Hash::repeat_byte(0x0a));
+		let hash_b = CandidateHash(Hash::repeat_byte(0x0b));
+		let hash_c = CandidateHash(Hash::repeat_byte(0x0c));
+		let hash_d = CandidateHash(Hash::repeat_byte(0x0d));
+
+		let prev_earliest_session = 0;
+		let new_earliest_session = 5;
+		let current_session = 5 + DISPUTE_WINDOW;
+
+		let very_old = 3;
+		let slightly_old = 4;
+		let very_recent = current_session - 1;
+
+		let blank_candidate_votes = || CandidateVotes {
+			candidate_receipt: Default::default(),
+			valid: Vec::new(),
+			invalid: Vec::new(),
+		};
+
+		{
+			let mut tx = Transaction::default();
+			tx.put_earliest_session(prev_earliest_session);
+			tx.put_active_disputes(ActiveDisputes {
+				disputed: vec![
+					(very_old, hash_a),
+					(slightly_old, hash_b),
+					(new_earliest_session, hash_c),
+					(very_recent, hash_d),
+				],
+			});
+
+			tx.put_candidate_votes(
+				very_old,
+				hash_a,
+				blank_candidate_votes(),
+			);
+
+			tx.put_candidate_votes(
+				slightly_old,
+				hash_b,
+				blank_candidate_votes(),
+			);
+
+			tx.put_candidate_votes(
+				new_earliest_session,
+				hash_c,
+				blank_candidate_votes(),
+			);
+
+			tx.put_candidate_votes(
+				very_recent,
+				hash_d,
+				blank_candidate_votes(),
+			);
+
+			tx.write(&store, &config).unwrap();
+		}
+
+		note_current_session(&store, &config, current_session).unwrap();
+
+		assert_eq!(
+			load_earliest_session(&store, &config).unwrap(),
+			Some(new_earliest_session),
+		);
+
+		assert_eq!(
+			load_active_disputes(&store, &config).unwrap().unwrap(),
+			ActiveDisputes {
+				disputed: vec![(new_earliest_session, hash_c), (very_recent, hash_d)],
+			},
+		);
+
+		assert!(load_candidate_votes(&store, &config, very_old, &hash_a).unwrap().is_none());
+		assert!(load_candidate_votes(&store, &config, slightly_old, &hash_b).unwrap().is_none());
+		assert!(load_candidate_votes(&store, &config, new_earliest_session, &hash_c).unwrap().is_some());
+		assert!(load_candidate_votes(&store, &config, very_recent, &hash_d).unwrap().is_some());
+	}
+}
diff --git a/polkadot/node/core/dispute-coordinator/src/lib.rs b/polkadot/node/core/dispute-coordinator/src/lib.rs
new file mode 100644
index 00000000000..618d312758c
--- /dev/null
+++ b/polkadot/node/core/dispute-coordinator/src/lib.rs
@@ -0,0 +1,649 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Implements the dispute coordinator subsystem.
+//!
+//! This is the central subsystem of the node-side components which participate in disputes.
+//! This subsystem wraps a database which tracks all statements observed by all validators over some window of sessions.
+//! Votes older than this session window are pruned.
+//!
+//! This subsystem will be the point which produce dispute votes, either positive or negative, based on locally-observed
+//! validation results as well as a sink for votes received by other subsystems. When importing a dispute vote from
+//! another node, this will trigger the dispute participation subsystem to recover and validate the block and call
+//! back to this subsystem.
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use polkadot_node_primitives::{CandidateVotes, SignedDisputeStatement};
+use polkadot_node_subsystem::{
+	messages::{
+		DisputeCoordinatorMessage, ChainApiMessage, DisputeParticipationMessage,
+	},
+	Subsystem, SubsystemContext, FromOverseer, OverseerSignal, SpawnedSubsystem,
+	SubsystemError,
+	errors::{ChainApiError, RuntimeApiError},
+};
+use polkadot_node_subsystem_util::rolling_session_window::{
+	RollingSessionWindow, SessionWindowUpdate,
+};
+use polkadot_primitives::v1::{
+	SessionIndex, CandidateHash, Hash, CandidateReceipt, DisputeStatement, ValidatorIndex,
+	ValidatorSignature, BlockNumber, ValidatorPair,
+};
+
+use futures::prelude::*;
+use futures::channel::oneshot;
+use kvdb::KeyValueDB;
+use parity_scale_codec::Error as CodecError;
+use sc_keystore::LocalKeystore;
+
+use db::v1::ActiveDisputes;
+
+mod db;
+
+#[cfg(test)]
+mod tests;
+
+const LOG_TARGET: &str = "parachain::dispute-coordinator";
+
+// It would be nice to draw this from the chain state, but we have no tools for it right now.
+// On Polkadot this is 1 day, and on Kusama it's 6 hours.
+const DISPUTE_WINDOW: SessionIndex = 6;
+
+struct State {
+	keystore: Arc<LocalKeystore>,
+	highest_session: Option<SessionIndex>,
+	rolling_session_window: RollingSessionWindow,
+}
+
+/// Configuration for the dispute coordinator subsystem.
+#[derive(Debug, Clone, Copy)]
+pub struct Config {
+	/// The data column in the store to use for dispute data.
+	pub col_data: u32,
+}
+
+impl Config {
+	fn column_config(&self) -> db::v1::ColumnConfiguration {
+		db::v1::ColumnConfiguration { col_data: self.col_data }
+	}
+}
+
+/// An implementation of the dispute coordinator subsystem.
+pub struct DisputeCoordinatorSubsystem {
+	config: Config,
+	store: Arc<dyn KeyValueDB>,
+	keystore: Arc<LocalKeystore>,
+}
+
+impl DisputeCoordinatorSubsystem {
+	/// Create a new instance of the subsystem.
+	pub fn new(
+		store: Arc<dyn KeyValueDB>,
+		config: Config,
+		keystore: Arc<LocalKeystore>,
+	) -> Self {
+		DisputeCoordinatorSubsystem { store, config, keystore }
+	}
+}
+
+impl<Context> Subsystem<Context> for DisputeCoordinatorSubsystem
+	where Context: SubsystemContext<Message = DisputeCoordinatorMessage>
+{
+	fn start(self, ctx: Context) -> SpawnedSubsystem {
+		let future = run(self, ctx)
+			.map(|_| Ok(()))
+			.boxed();
+
+		SpawnedSubsystem {
+			name: "dispute-coordinator-subsystem",
+			future,
+		}
+	}
+}
+
+#[derive(Debug, thiserror::Error)]
+#[allow(missing_docs)]
+pub enum Error {
+	#[error(transparent)]
+	RuntimeApi(#[from] RuntimeApiError),
+
+	#[error(transparent)]
+	ChainApi(#[from] ChainApiError),
+
+	#[error(transparent)]
+	Io(#[from] std::io::Error),
+
+	#[error(transparent)]
+	Oneshot(#[from] oneshot::Canceled),
+
+	#[error(transparent)]
+	Subsystem(#[from] SubsystemError),
+
+	#[error(transparent)]
+	Codec(#[from] CodecError),
+}
+
+impl From<db::v1::Error> for Error {
+	fn from(err: db::v1::Error) -> Self {
+		match err {
+			db::v1::Error::Io(io) => Self::Io(io),
+			db::v1::Error::Codec(e) => Self::Codec(e),
+		}
+	}
+}
+
+impl Error {
+	fn trace(&self) {
+		match self {
+			// don't spam the log with spurious errors
+			Self::RuntimeApi(_) |
+			Self::Oneshot(_) => tracing::debug!(target: LOG_TARGET, err = ?self),
+			// it's worth reporting otherwise
+			_ => tracing::warn!(target: LOG_TARGET, err = ?self),
+		}
+	}
+}
+
+async fn run<Context>(subsystem: DisputeCoordinatorSubsystem, mut ctx: Context)
+	where Context: SubsystemContext<Message = DisputeCoordinatorMessage>
+{
+	loop {
+		let res = run_iteration(&mut ctx, &subsystem).await;
+		match res {
+			Err(e) => {
+				e.trace();
+
+				if let Error::Subsystem(SubsystemError::Context(_)) = e {
+					break;
+				}
+			}
+			Ok(()) => {
+				tracing::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
+				break;
+			}
+		}
+	}
+}
+
+// Run the subsystem until an error is encountered or a `conclude` signal is received.
+// Most errors are non-fatal and should lead to another call to this function.
+//
+// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
+// lead to another call to this function.
+async fn run_iteration<Context>(ctx: &mut Context, subsystem: &DisputeCoordinatorSubsystem)
+	-> Result<(), Error>
+	where Context: SubsystemContext<Message = DisputeCoordinatorMessage>
+{
+	let DisputeCoordinatorSubsystem { ref store, ref keystore, ref config } = *subsystem;
+	let mut state = State {
+		keystore: keystore.clone(),
+		highest_session: None,
+		rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW),
+	};
+
+	loop {
+		match ctx.recv().await? {
+			FromOverseer::Signal(OverseerSignal::Conclude) => {
+				return Ok(())
+			}
+			FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
+				handle_new_activations(
+					ctx,
+					&**store,
+					&mut state,
+					config,
+					update.activated.into_iter().map(|a| a.hash),
+				).await?
+			}
+			FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
+			FromOverseer::Communication { msg } => {
+				handle_incoming(
+					ctx,
+					&**store,
+					&mut state,
+					config,
+					msg,
+				).await?
+			}
+		}
+	}
+}
+
+async fn handle_new_activations(
+	ctx: &mut impl SubsystemContext,
+	store: &dyn KeyValueDB,
+	state: &mut State,
+	config: &Config,
+	new_activations: impl IntoIterator<Item = Hash>,
+) -> Result<(), Error> {
+	for new_leaf in new_activations {
+		let block_header = {
+			let (tx, rx) = oneshot::channel();
+
+			ctx.send_message(
+				ChainApiMessage::BlockHeader(new_leaf, tx).into()
+			).await;
+
+			match rx.await?? {
+				None => continue,
+				Some(header) => header,
+			}
+		};
+
+		match state.rolling_session_window.cache_session_info_for_head(
+			ctx,
+			new_leaf,
+			&block_header,
+		).await {
+			Err(e) => {
+				tracing::warn!(
+					target: LOG_TARGET,
+					err = ?e,
+					"Failed to update session cache for disputes",
+				);
+
+				continue
+			}
+			Ok(SessionWindowUpdate::Initialized { window_end, .. })
+				| Ok(SessionWindowUpdate::Advanced { new_window_end: window_end, .. })
+			=> {
+				let session = window_end;
+				if state.highest_session.map_or(true, |s| s < session) {
+					tracing::trace!(
+						target: LOG_TARGET,
+						session,
+						"Observed new session. Pruning",
+					);
+
+					state.highest_session = Some(session);
+
+					db::v1::note_current_session(
+						store,
+						&config.column_config(),
+						session,
+					)?;
+				}
+			}
+			_ => {}
+		}
+
+		// TODO [after https://github.com/paritytech/polkadot/issues/3160]: chain rollbacks
+	}
+
+	Ok(())
+}
+
+async fn handle_incoming(
+	ctx: &mut impl SubsystemContext,
+	store: &dyn KeyValueDB,
+	state: &mut State,
+	config: &Config,
+	message: DisputeCoordinatorMessage,
+) -> Result<(), Error> {
+	match message {
+		DisputeCoordinatorMessage::ImportStatements {
+			candidate_hash,
+			candidate_receipt,
+			session,
+			statements,
+		} => {
+			handle_import_statements(
+				ctx,
+				store,
+				state,
+				config,
+				candidate_hash,
+				candidate_receipt,
+				session,
+				statements,
+			).await?;
+		}
+		DisputeCoordinatorMessage::ActiveDisputes(rx) => {
+			let active_disputes = db::v1::load_active_disputes(store, &config.column_config())?
+				.map(|d| d.disputed)
+				.unwrap_or_default();
+
+			let _ = rx.send(active_disputes);
+		}
+		DisputeCoordinatorMessage::QueryCandidateVotes(
+			session,
+			candidate_hash,
+			rx
+		) => {
+			let candidate_votes = db::v1::load_candidate_votes(
+				store,
+				&config.column_config(),
+				session,
+				&candidate_hash,
+			)?;
+
+			let _ = rx.send(candidate_votes.map(Into::into));
+		}
+		DisputeCoordinatorMessage::IssueLocalStatement(
+			session,
+			candidate_hash,
+			candidate_receipt,
+			valid,
+		) => {
+			issue_local_statement(
+				ctx,
+				state,
+				store,
+				config,
+				candidate_hash,
+				candidate_receipt,
+				session,
+				valid,
+			).await?;
+		}
+		DisputeCoordinatorMessage::DetermineUndisputedChain {
+			base_number,
+			block_descriptions,
+			tx,
+		} => {
+			let undisputed_chain = determine_undisputed_chain(
+				store,
+				&config,
+				base_number,
+				block_descriptions
+			)?;
+
+			let _ = tx.send(undisputed_chain);
+		}
+	}
+
+	Ok(())
+}
+
+fn insert_into_statement_vec<T>(
+	vec: &mut Vec<(T, ValidatorIndex, ValidatorSignature)>,
+	tag: T,
+	val_index: ValidatorIndex,
+	val_signature: ValidatorSignature,
+) {
+	let pos = match vec.binary_search_by_key(&val_index, |x| x.1) {
+		Ok(_) => return, // no duplicates needed.
+		Err(p) => p,
+	};
+
+	vec.insert(pos, (tag, val_index, val_signature));
+}
+
+async fn handle_import_statements(
+	ctx: &mut impl SubsystemContext,
+	store: &dyn KeyValueDB,
+	state: &mut State,
+	config: &Config,
+	candidate_hash: CandidateHash,
+	candidate_receipt: CandidateReceipt,
+	session: SessionIndex,
+	statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
+) -> Result<(), Error> {
+	if state.highest_session.map_or(true, |h| session + DISPUTE_WINDOW < h) {
+		return Ok(());
+	}
+
+	let validators = match state.rolling_session_window.session_info(session) {
+		None => {
+			tracing::warn!(
+				target: LOG_TARGET,
+				session,
+				"Missing info for session which has an active dispute",
+			);
+
+			return Ok(())
+		}
+		Some(info) => info.validators.clone(),
+	};
+
+	let n_validators = validators.len();
+
+	let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold(n_validators);
+
+	let mut votes = db::v1::load_candidate_votes(
+		store,
+		&config.column_config(),
+		session,
+		&candidate_hash
+	)?
+		.map(CandidateVotes::from)
+		.unwrap_or_else(|| CandidateVotes {
+			candidate_receipt: candidate_receipt.clone(),
+			valid: Vec::new(),
+			invalid: Vec::new(),
+		});
+
+	let was_undisputed = votes.valid.is_empty() || votes.invalid.is_empty();
+
+	// Update candidate votes.
+	for (statement, val_index) in statements {
+		if validators.get(val_index.0 as usize)
+			.map_or(true, |v| v != statement.validator_public())
+		{
+			tracing::debug!(
+				target: LOG_TARGET,
+				?val_index,
+				session,
+				claimed_key = ?statement.validator_public(),
+				"Validator index doesn't match claimed key",
+			);
+
+			continue
+		}
+
+		match statement.statement().clone() {
+			DisputeStatement::Valid(valid_kind) => {
+				insert_into_statement_vec(
+					&mut votes.valid,
+					valid_kind,
+					val_index,
+					statement.validator_signature().clone(),
+				);
+			}
+			DisputeStatement::Invalid(invalid_kind) => {
+				insert_into_statement_vec(
+					&mut votes.invalid,
+					invalid_kind,
+					val_index,
+					statement.validator_signature().clone(),
+				);
+			}
+		}
+	}
+
+	// Check if newly disputed.
+	let is_disputed = !votes.valid.is_empty() && !votes.invalid.is_empty();
+	let freshly_disputed = is_disputed && was_undisputed;
+	let already_disputed = is_disputed && !was_undisputed;
+	let concluded_valid = votes.valid.len() >= supermajority_threshold;
+
+	let mut tx = db::v1::Transaction::default();
+
+	if freshly_disputed && !concluded_valid {
+		// add to active disputes and begin local participation.
+		update_active_disputes(
+			store,
+			config,
+			&mut tx,
+			|active| active.insert(session, candidate_hash),
+		)?;
+
+		let voted_indices = votes.voted_indices();
+
+		ctx.send_message(DisputeParticipationMessage::Participate {
+			candidate_hash,
+			candidate_receipt,
+			session,
+			voted_indices,
+		}.into()).await;
+	}
+
+	if concluded_valid && already_disputed {
+		// remove from active disputes.
+		update_active_disputes(
+			store,
+			config,
+			&mut tx,
+			|active| active.delete(session, candidate_hash),
+		)?;
+	}
+
+	tx.put_candidate_votes(session, candidate_hash, votes.into());
+	tx.write(store, &config.column_config())?;
+
+	Ok(())
+}
+
+fn update_active_disputes(
+	store: &dyn KeyValueDB,
+	config: &Config,
+	tx: &mut db::v1::Transaction,
+	with_active: impl FnOnce(&mut ActiveDisputes) -> bool,
+) -> Result<(), Error> {
+	let mut active_disputes = db::v1::load_active_disputes(store, &config.column_config())?
+		.unwrap_or_default();
+
+	if with_active(&mut active_disputes) {
+		tx.put_active_disputes(active_disputes);
+	}
+
+	Ok(())
+}
+
+async fn issue_local_statement(
+	ctx: &mut impl SubsystemContext,
+	state: &mut State,
+	store: &dyn KeyValueDB,
+	config: &Config,
+	candidate_hash: CandidateHash,
+	candidate_receipt: CandidateReceipt,
+	session: SessionIndex,
+	valid: bool,
+) -> Result<(), Error> {
+	// Load session info.
+	let validators = match state.rolling_session_window.session_info(session) {
+		None => {
+			tracing::warn!(
+				target: LOG_TARGET,
+				session,
+				"Missing info for session which has an active dispute",
+			);
+
+			return Ok(())
+		}
+		Some(info) => info.validators.clone(),
+	};
+
+	let votes = db::v1::load_candidate_votes(
+		store,
+		&config.column_config(),
+		session,
+		&candidate_hash
+	)?
+		.map(CandidateVotes::from)
+		.unwrap_or_else(|| CandidateVotes {
+			candidate_receipt: candidate_receipt.clone(),
+			valid: Vec::new(),
+			invalid: Vec::new(),
+		});
+
+	// Sign a statement for each validator index we control which has
+	// not already voted. This should generally be maximum 1 statement.
+	let voted_indices = votes.voted_indices();
+	let mut statements = Vec::new();
+
+	let voted_indices: HashSet<_> = voted_indices.into_iter().collect();
+	for (index, validator) in validators.iter().enumerate() {
+		let index = ValidatorIndex(index as _);
+		if voted_indices.contains(&index) { continue }
+		if state.keystore.key_pair::<ValidatorPair>(validator).ok().flatten().is_none() {
+			continue
+		}
+
+		let keystore = state.keystore.clone() as Arc<_>;
+		let res = SignedDisputeStatement::sign_explicit(
+			&keystore,
+			valid,
+			candidate_hash,
+			session,
+			validator.clone(),
+		).await;
+
+		match res {
+			Ok(Some(signed_dispute_statement)) => {
+				statements.push((signed_dispute_statement, index));
+			}
+			Ok(None) => {}
+			Err(e) => {
+				tracing::error!(
+					target: LOG_TARGET,
+					err = ?e,
+					"Encountered keystore error while signing dispute statement",
+				);
+			}
+		}
+	}
+
+	// Do import
+	if !statements.is_empty() {
+		handle_import_statements(
+			ctx,
+			store,
+			state,
+			config,
+			candidate_hash,
+			candidate_receipt,
+			session,
+			statements,
+		).await?;
+	}
+
+	Ok(())
+}
+
+fn determine_undisputed_chain(
+	store: &dyn KeyValueDB,
+	config: &Config,
+	base_number: BlockNumber,
+	block_descriptions: Vec<(Hash, SessionIndex, Vec<CandidateHash>)>,
+) -> Result<Option<(BlockNumber, Hash)>, Error> {
+	let last = block_descriptions.last()
+		.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.0));
+
+	// Fast path for no disputes.
+	let active_disputes = match db::v1::load_active_disputes(store, &config.column_config())? {
+		None => return Ok(last),
+		Some(a) if a.disputed.is_empty() => return Ok(last),
+		Some(a) => a,
+	};
+
+	for (i, (_, session, candidates)) in block_descriptions.iter().enumerate() {
+		if candidates.iter().any(|c| active_disputes.contains(*session, *c)) {
+			if i == 0 {
+				return Ok(None);
+			} else {
+				return Ok(Some((
+					base_number + i as BlockNumber,
+					block_descriptions[i - 1].0,
+				)));
+			}
+		}
+	}
+
+	Ok(last)
+}
diff --git a/polkadot/node/core/dispute-coordinator/src/tests.rs b/polkadot/node/core/dispute-coordinator/src/tests.rs
new file mode 100644
index 00000000000..831e703c826
--- /dev/null
+++ b/polkadot/node/core/dispute-coordinator/src/tests.rs
@@ -0,0 +1,706 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+
+use super::*;
+use polkadot_primitives::v1::{BlakeTwo256, HashT, ValidatorId, Header, SessionInfo};
+use polkadot_node_subsystem::{jaeger, ActiveLeavesUpdate, ActivatedLeaf, LeafStatus};
+use polkadot_node_subsystem::messages::{
+	AllMessages, ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest,
+};
+use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TestSubsystemContextHandle};
+use sp_core::testing::TaskExecutor;
+use sp_keyring::Sr25519Keyring;
+use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
+use futures::future::{self, BoxFuture};
+use parity_scale_codec::Encode;
+use assert_matches::assert_matches;
+
+// sets up a keystore with the given keyring accounts.
+fn make_keystore(accounts: &[Sr25519Keyring]) -> LocalKeystore {
+	let store = LocalKeystore::in_memory();
+
+	for s in accounts.iter().copied().map(|k| k.to_seed()) {
+		store.sr25519_generate_new(
+			polkadot_primitives::v1::PARACHAIN_KEY_TYPE_ID,
+			Some(s.as_str()),
+		).unwrap();
+	}
+
+	store
+}
+
+fn session_to_hash(session: SessionIndex, extra: impl Encode) -> Hash {
+	BlakeTwo256::hash_of(&(session, extra))
+}
+
+type VirtualOverseer = TestSubsystemContextHandle<DisputeCoordinatorMessage>;
+
+struct TestState {
+	validators: Vec<Sr25519Keyring>,
+	validator_public: Vec<ValidatorId>,
+	validator_groups: Vec<Vec<ValidatorIndex>>,
+	master_keystore: Arc<sc_keystore::LocalKeystore>,
+	subsystem_keystore: Arc<sc_keystore::LocalKeystore>,
+	db: Arc<dyn KeyValueDB>,
+	config: Config,
+}
+
+impl Default for TestState {
+	fn default() -> TestState {
+		let validators = vec![
+			Sr25519Keyring::Alice,
+			Sr25519Keyring::Bob,
+			Sr25519Keyring::Charlie,
+			Sr25519Keyring::Dave,
+			Sr25519Keyring::Eve,
+			Sr25519Keyring::One,
+		];
+
+		let validator_public = validators.iter()
+			.map(|k| ValidatorId::from(k.public()))
+			.collect();
+
+		let validator_groups = vec![
+			vec![ValidatorIndex(0), ValidatorIndex(1)],
+			vec![ValidatorIndex(2), ValidatorIndex(3)],
+			vec![ValidatorIndex(4), ValidatorIndex(5)],
+		];
+
+		let master_keystore = make_keystore(&validators).into();
+		let subsystem_keystore = make_keystore(&[Sr25519Keyring::Alice]).into();
+
+		let db = Arc::new(kvdb_memorydb::create(1));
+		let config = Config {
+			col_data: 0,
+		};
+
+		TestState {
+			validators,
+			validator_public,
+			validator_groups,
+			master_keystore,
+			subsystem_keystore,
+			db,
+			config,
+		}
+	}
+}
+
+impl TestState {
+	async fn activate_leaf_at_session(
+		&self,
+		virtual_overseer: &mut VirtualOverseer,
+		session: SessionIndex,
+		block_number: BlockNumber,
+	) {
+		assert!(block_number > 0);
+
+		let parent_hash = session_to_hash(session, b"parent");
+		let block_header = Header {
+			parent_hash,
+			number: block_number,
+			digest: Default::default(),
+			state_root: Default::default(),
+			extrinsics_root: Default::default(),
+		};
+		let block_hash = block_header.hash();
+
+		virtual_overseer.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
+			ActiveLeavesUpdate::start_work(ActivatedLeaf {
+				hash: block_hash,
+				span: Arc::new(jaeger::Span::Disabled),
+				number: block_number,
+				status: LeafStatus::Fresh,
+			})
+		))).await;
+
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
+				assert_eq!(h, block_hash);
+				let _ = tx.send(Ok(Some(block_header)));
+			}
+		);
+
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+				h,
+				RuntimeApiRequest::SessionIndexForChild(tx),
+			)) => {
+				assert_eq!(h, parent_hash);
+				let _ = tx.send(Ok(session));
+			}
+		);
+
+		loop {
+			// answer session info queries until the current session is reached.
+			assert_matches!(
+				virtual_overseer.recv().await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					h,
+					RuntimeApiRequest::SessionInfo(session_index, tx),
+				)) => {
+					assert_eq!(h, block_hash);
+
+					let _ = tx.send(Ok(Some(self.session_info())));
+					if session_index == session { break }
+				}
+			)
+		}
+	}
+
+	fn session_info(&self) -> SessionInfo {
+		let discovery_keys = self.validators.iter()
+			.map(|k| <_>::from(k.public()))
+			.collect();
+
+		let assignment_keys = self.validators.iter()
+			.map(|k| <_>::from(k.public()))
+			.collect();
+
+		SessionInfo {
+			validators: self.validator_public.clone(),
+			discovery_keys,
+			assignment_keys,
+			validator_groups: self.validator_groups.clone(),
+			n_cores: self.validator_groups.len() as _,
+			zeroth_delay_tranche_width: 0,
+			relay_vrf_modulo_samples: 1,
+			n_delay_tranches: 100,
+			no_show_slots: 1,
+			needed_approvals: 10,
+		}
+	}
+
+	async fn issue_statement_with_index(
+		&self,
+		index: usize,
+		candidate_hash: CandidateHash,
+		session: SessionIndex,
+		valid: bool,
+	) -> SignedDisputeStatement {
+		let public = self.validator_public[index].clone();
+
+		let keystore = self.master_keystore.clone() as SyncCryptoStorePtr;
+
+		SignedDisputeStatement::sign_explicit(
+			&keystore,
+			valid,
+			candidate_hash,
+			session,
+			public,
+		).await.unwrap().unwrap()
+	}
+}
+
+fn test_harness<F>(test: F)
+	where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, ()>
+{
+	let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new());
+
+	let state = TestState::default();
+	let subsystem = DisputeCoordinatorSubsystem::new(
+		state.db.clone(),
+		state.config.clone(),
+		state.subsystem_keystore.clone(),
+	);
+
+	let subsystem_task = run(subsystem, ctx);
+	let test_task = test(state, ctx_handle);
+
+	futures::executor::block_on(future::join(subsystem_task, test_task));
+}
+
+#[test]
+fn conflicting_votes_lead_to_dispute_participation() {
+	test_harness(|test_state, mut virtual_overseer| Box::pin(async move {
+		let session = 1;
+
+		let candidate_receipt = CandidateReceipt::default();
+		let candidate_hash = candidate_receipt.hash();
+
+		test_state.activate_leaf_at_session(
+			&mut virtual_overseer,
+			session,
+			1,
+		).await;
+
+		let valid_vote = test_state.issue_statement_with_index(
+			0,
+			candidate_hash,
+			session,
+			true,
+		).await;
+
+		let invalid_vote = test_state.issue_statement_with_index(
+			1,
+			candidate_hash,
+			session,
+			false,
+		).await;
+
+		let invalid_vote_2 = test_state.issue_statement_with_index(
+			2,
+			candidate_hash,
+			session,
+			false,
+		).await;
+
+		virtual_overseer.send(FromOverseer::Communication {
+			msg: DisputeCoordinatorMessage::ImportStatements {
+				candidate_hash,
+				candidate_receipt: candidate_receipt.clone(),
+				session,
+				statements: vec![
+					(valid_vote, ValidatorIndex(0)),
+					(invalid_vote, ValidatorIndex(1)),
+				],
+			},
+		}).await;
+
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::DisputeParticipation(DisputeParticipationMessage::Participate {
+				candidate_hash: c_hash,
+				candidate_receipt: c_receipt,
+				session: s,
+				voted_indices,
+			}) => {
+				assert_eq!(c_hash, candidate_hash);
+				assert_eq!(c_receipt, candidate_receipt);
+				assert_eq!(s, session);
+				assert_eq!(voted_indices, vec![ValidatorIndex(0), ValidatorIndex(1)]);
+			}
+		);
+
+		{
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
+			}).await;
+
+			assert_eq!(rx.await.unwrap(), vec![(session, candidate_hash)]);
+
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::QueryCandidateVotes(
+					session,
+					candidate_hash,
+					tx,
+				),
+			}).await;
+
+			let votes = rx.await.unwrap().unwrap();
+			assert_eq!(votes.valid.len(), 1);
+			assert_eq!(votes.invalid.len(), 1);
+		}
+
+		virtual_overseer.send(FromOverseer::Communication {
+			msg: DisputeCoordinatorMessage::ImportStatements {
+				candidate_hash,
+				candidate_receipt: candidate_receipt.clone(),
+				session,
+				statements: vec![
+					(invalid_vote_2, ValidatorIndex(2)),
+				],
+			},
+		}).await;
+
+		{
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::QueryCandidateVotes(
+					session,
+					candidate_hash,
+					tx,
+				),
+			}).await;
+
+			let votes = rx.await.unwrap().unwrap();
+			assert_eq!(votes.valid.len(), 1);
+			assert_eq!(votes.invalid.len(), 2);
+		}
+
+		virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
+
+		// This confirms that the second vote doesn't lead to participation again.
+		assert!(virtual_overseer.try_recv().await.is_none());
+	}));
+}
+
+#[test]
+fn positive_votes_dont_trigger_participation() {
+	test_harness(|test_state, mut virtual_overseer| Box::pin(async move {
+		let session = 1;
+
+		let candidate_receipt = CandidateReceipt::default();
+		let candidate_hash = candidate_receipt.hash();
+
+		test_state.activate_leaf_at_session(
+			&mut virtual_overseer,
+			session,
+			1,
+		).await;
+
+		let valid_vote = test_state.issue_statement_with_index(
+			0,
+			candidate_hash,
+			session,
+			true,
+		).await;
+
+		let valid_vote_2 = test_state.issue_statement_with_index(
+			1,
+			candidate_hash,
+			session,
+			true,
+		).await;
+
+		virtual_overseer.send(FromOverseer::Communication {
+			msg: DisputeCoordinatorMessage::ImportStatements {
+				candidate_hash,
+				candidate_receipt: candidate_receipt.clone(),
+				session,
+				statements: vec![
+					(valid_vote, ValidatorIndex(0)),
+				],
+			},
+		}).await;
+
+		{
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
+			}).await;
+
+			assert!(rx.await.unwrap().is_empty());
+
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::QueryCandidateVotes(
+					session,
+					candidate_hash,
+					tx,
+				),
+			}).await;
+
+			let votes = rx.await.unwrap().unwrap();
+			assert_eq!(votes.valid.len(), 1);
+			assert!(votes.invalid.is_empty());
+		}
+
+		virtual_overseer.send(FromOverseer::Communication {
+			msg: DisputeCoordinatorMessage::ImportStatements {
+				candidate_hash,
+				candidate_receipt: candidate_receipt.clone(),
+				session,
+				statements: vec![
+					(valid_vote_2, ValidatorIndex(1)),
+				],
+			},
+		}).await;
+
+		{
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
+			}).await;
+
+			assert!(rx.await.unwrap().is_empty());
+
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::QueryCandidateVotes(
+					session,
+					candidate_hash,
+					tx,
+				),
+			}).await;
+
+			let votes = rx.await.unwrap().unwrap();
+			assert_eq!(votes.valid.len(), 2);
+			assert!(votes.invalid.is_empty());
+		}
+
+		virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
+
+		// This confirms that no participation request is made.
+		assert!(virtual_overseer.try_recv().await.is_none());
+	}));
+}
+
+#[test]
+fn wrong_validator_index_is_ignored() {
+	test_harness(|test_state, mut virtual_overseer| Box::pin(async move {
+		let session = 1;
+
+		let candidate_receipt = CandidateReceipt::default();
+		let candidate_hash = candidate_receipt.hash();
+
+		test_state.activate_leaf_at_session(
+			&mut virtual_overseer,
+			session,
+			1,
+		).await;
+
+		let valid_vote = test_state.issue_statement_with_index(
+			0,
+			candidate_hash,
+			session,
+			true,
+		).await;
+
+		let invalid_vote = test_state.issue_statement_with_index(
+			1,
+			candidate_hash,
+			session,
+			false,
+		).await;
+
+		virtual_overseer.send(FromOverseer::Communication {
+			msg: DisputeCoordinatorMessage::ImportStatements {
+				candidate_hash,
+				candidate_receipt: candidate_receipt.clone(),
+				session,
+				statements: vec![
+					(valid_vote, ValidatorIndex(1)),
+					(invalid_vote, ValidatorIndex(0)),
+				],
+			},
+		}).await;
+
+		{
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
+			}).await;
+
+			assert!(rx.await.unwrap().is_empty());
+
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::QueryCandidateVotes(
+					session,
+					candidate_hash,
+					tx,
+				),
+			}).await;
+
+			let votes = rx.await.unwrap().unwrap();
+			assert!(votes.valid.is_empty());
+			assert!(votes.invalid.is_empty());
+		}
+
+		virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
+
+		// This confirms that no participation request is made.
+		assert!(virtual_overseer.try_recv().await.is_none());
+	}));
+}
+
+#[test]
+fn finality_votes_ignore_disputed_candidates() {
+	test_harness(|test_state, mut virtual_overseer| Box::pin(async move {
+		let session = 1;
+
+		let candidate_receipt = CandidateReceipt::default();
+		let candidate_hash = candidate_receipt.hash();
+
+		test_state.activate_leaf_at_session(
+			&mut virtual_overseer,
+			session,
+			1,
+		).await;
+
+		let valid_vote = test_state.issue_statement_with_index(
+			0,
+			candidate_hash,
+			session,
+			true,
+		).await;
+
+		let invalid_vote = test_state.issue_statement_with_index(
+			1,
+			candidate_hash,
+			session,
+			false,
+		).await;
+
+		virtual_overseer.send(FromOverseer::Communication {
+			msg: DisputeCoordinatorMessage::ImportStatements {
+				candidate_hash,
+				candidate_receipt: candidate_receipt.clone(),
+				session,
+				statements: vec![
+					(valid_vote, ValidatorIndex(0)),
+					(invalid_vote, ValidatorIndex(1)),
+				],
+			},
+		}).await;
+		let _ = virtual_overseer.recv().await;
+
+		{
+			let (tx, rx) = oneshot::channel();
+
+			let block_hash_a = Hash::repeat_byte(0x0a);
+			let block_hash_b = Hash::repeat_byte(0x0b);
+
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
+					base_number: 10,
+					block_descriptions: vec![
+						(block_hash_a, session, vec![candidate_hash]),
+					],
+					tx,
+				},
+			}).await;
+
+			assert!(rx.await.unwrap().is_none());
+
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
+					base_number: 10,
+					block_descriptions: vec![
+						(block_hash_a, session, vec![]),
+						(block_hash_b, session, vec![candidate_hash]),
+					],
+					tx,
+				},
+			}).await;
+
+			assert_eq!(rx.await.unwrap(), Some((11, block_hash_a)));
+		}
+
+		virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
+		assert!(virtual_overseer.try_recv().await.is_none());
+	}));
+}
+
+#[test]
+fn supermajority_valid_dispute_may_be_finalized() {
+	test_harness(|test_state, mut virtual_overseer| Box::pin(async move {
+		let session = 1;
+
+		let candidate_receipt = CandidateReceipt::default();
+		let candidate_hash = candidate_receipt.hash();
+
+		test_state.activate_leaf_at_session(
+			&mut virtual_overseer,
+			session,
+			1,
+		).await;
+
+		let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold(
+			test_state.validators.len()
+		);
+
+		let valid_vote = test_state.issue_statement_with_index(
+			0,
+			candidate_hash,
+			session,
+			true,
+		).await;
+
+		let invalid_vote = test_state.issue_statement_with_index(
+			1,
+			candidate_hash,
+			session,
+			false,
+		).await;
+
+		virtual_overseer.send(FromOverseer::Communication {
+			msg: DisputeCoordinatorMessage::ImportStatements {
+				candidate_hash,
+				candidate_receipt: candidate_receipt.clone(),
+				session,
+				statements: vec![
+					(valid_vote, ValidatorIndex(0)),
+					(invalid_vote, ValidatorIndex(1)),
+				],
+			},
+		}).await;
+
+		let _ = virtual_overseer.recv().await;
+
+		let mut statements = Vec::new();
+		for i in (0..supermajority_threshold - 1).map(|i| i + 2) {
+			let vote = test_state.issue_statement_with_index(
+				i,
+				candidate_hash,
+				session,
+				true,
+			).await;
+
+			statements.push((vote, ValidatorIndex(i as _)));
+		};
+
+		virtual_overseer.send(FromOverseer::Communication {
+			msg: DisputeCoordinatorMessage::ImportStatements {
+				candidate_hash,
+				candidate_receipt: candidate_receipt.clone(),
+				session,
+				statements,
+			},
+		}).await;
+
+		{
+			let (tx, rx) = oneshot::channel();
+
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::ActiveDisputes(tx),
+			}).await;
+
+			assert!(rx.await.unwrap().is_empty());
+
+			let (tx, rx) = oneshot::channel();
+
+			let block_hash_a = Hash::repeat_byte(0x0a);
+			let block_hash_b = Hash::repeat_byte(0x0b);
+
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
+					base_number: 10,
+					block_descriptions: vec![
+						(block_hash_a, session, vec![candidate_hash]),
+					],
+					tx,
+				},
+			}).await;
+
+			assert_eq!(rx.await.unwrap(), Some((11, block_hash_a)));
+
+			let (tx, rx) = oneshot::channel();
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: DisputeCoordinatorMessage::DetermineUndisputedChain {
+					base_number: 10,
+					block_descriptions: vec![
+						(block_hash_a, session, vec![]),
+						(block_hash_b, session, vec![candidate_hash]),
+					],
+					tx,
+				},
+			}).await;
+
+			assert_eq!(rx.await.unwrap(), Some((12, block_hash_b)));
+		}
+
+		virtual_overseer.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
+		assert!(virtual_overseer.try_recv().await.is_none());
+	}));
+}
diff --git a/polkadot/node/network/bridge/src/tests.rs b/polkadot/node/network/bridge/src/tests.rs
index 74ca4501e12..2321b3bf5f8 100644
--- a/polkadot/node/network/bridge/src/tests.rs
+++ b/polkadot/node/network/bridge/src/tests.rs
@@ -1250,11 +1250,13 @@ fn spread_event_to_subsystems_is_up_to_date() {
 			AllMessages::ApprovalVoting(_) => unreachable!("Not interested in network events"),
 			AllMessages::ApprovalDistribution(_) => { cnt += 1; }
 			AllMessages::GossipSupport(_) => unreachable!("Not interested in network events"),
-			// Add variants here as needed, `{ cnt += 1; }` for those that need to be
-			// notified, `unreachable!()` for those that should not.
-		}
-	}
-	assert_eq!(cnt, EXPECTED_COUNT);
+			AllMessages::DisputeCoordinator(_) => unreachable!("Not interested in network events"),
+			AllMessages::DisputeParticipation(_) => unreachable!("Not interetsed in network events"),
+            // Add variants here as needed, `{ cnt += 1; }` for those that need to be
+            // notified, `unreachable!()` for those that should not.
+        }
+    }
+    assert_eq!(cnt, EXPECTED_COUNT);
 }
 
 #[test]
diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs
index 8dbd22b8fc6..f43dc9c45ab 100644
--- a/polkadot/node/overseer/src/lib.rs
+++ b/polkadot/node/overseer/src/lib.rs
@@ -629,6 +629,8 @@ impl ChannelsOut {
 			AllMessages::GossipSupport(msg) => {
 				self.gossip_support.send(make_packet(signals_received, msg)).await
 			},
+			AllMessages::DisputeCoordinator(_) => Ok(()),
+			AllMessages::DisputeParticipation(_) => Ok(()),
 		};
 
 		if res.is_err() {
@@ -731,6 +733,8 @@ impl ChannelsOut {
 					.unbounded_send(make_packet(signals_received, msg))
 					.map_err(|e| e.into_send_error())
 			},
+			AllMessages::DisputeCoordinator(_) => Ok(()),
+			AllMessages::DisputeParticipation(_) => Ok(()),
 		};
 
 		if res.is_err() {
@@ -2062,6 +2066,8 @@ where
 			AllMessages::GossipSupport(msg) => {
 				self.subsystems.gossip_support.send_message(msg).await?;
 			},
+			AllMessages::DisputeCoordinator(_) => {}
+			AllMessages::DisputeParticipation(_) => {}
 		}
 
 		Ok(())
diff --git a/polkadot/node/primitives/Cargo.toml b/polkadot/node/primitives/Cargo.toml
index 533fae147cd..7cefeab9ba0 100644
--- a/polkadot/node/primitives/Cargo.toml
+++ b/polkadot/node/primitives/Cargo.toml
@@ -15,6 +15,7 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-consensus-vrf = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-maybe-compressed-blob  = { git = "https://github.com/paritytech/substrate", branch = "master" }
 polkadot-parachain = { path = "../../parachain", default-features = false }
 schnorrkel = "0.9.1"
diff --git a/polkadot/node/primitives/src/approval.rs b/polkadot/node/primitives/src/approval.rs
index 8303478aa53..743c37f3275 100644
--- a/polkadot/node/primitives/src/approval.rs
+++ b/polkadot/node/primitives/src/approval.rs
@@ -98,10 +98,6 @@ pub struct IndirectAssignmentCert {
 	pub cert: AssignmentCert,
 }
 
-/// A vote of approval on a candidate.
-#[derive(Debug, Clone, Encode, Decode)]
-pub struct ApprovalVote(pub CandidateHash);
-
 /// A signed approval vote which references the candidate indirectly via the block.
 ///
 /// In practice, we have a look-up from block hash and candidate index to candidate hash,
diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs
index 7646ca1ff20..dddec2cfbe1 100644
--- a/polkadot/node/primitives/src/lib.rs
+++ b/polkadot/node/primitives/src/lib.rs
@@ -22,18 +22,28 @@
 
 #![deny(missing_docs)]
 
+use std::convert::TryInto;
 use std::pin::Pin;
 
 use serde::{Serialize, Deserialize};
 use futures::Future;
 use parity_scale_codec::{Decode, Encode};
+use sp_keystore::{CryptoStore, SyncCryptoStorePtr, Error as KeystoreError};
+use sp_application_crypto::AppKey;
 
 pub use sp_core::traits::SpawnNamed;
 pub use sp_consensus_babe::{
 	Epoch as BabeEpoch, BabeEpochConfiguration, AllowedSlots as BabeAllowedSlots,
 };
 
-use polkadot_primitives::v1::{BlakeTwo256, CandidateCommitments, CandidateHash, CollatorPair, CommittedCandidateReceipt, CompactStatement, EncodeAs, Hash, HashT, HeadData, Id as ParaId, OutboundHrmpMessage, PersistedValidationData, Signed, UncheckedSigned, UpwardMessage, ValidationCode, ValidatorIndex};
+use polkadot_primitives::v1::{
+	BlakeTwo256, CandidateCommitments, CandidateHash, CollatorPair, CommittedCandidateReceipt,
+	CompactStatement, EncodeAs, Hash, HashT, HeadData, Id as ParaId, OutboundHrmpMessage,
+	PersistedValidationData, Signed, UncheckedSigned, UpwardMessage, ValidationCode,
+	ValidatorIndex, ValidatorSignature, ValidDisputeStatementKind, InvalidDisputeStatementKind,
+	CandidateReceipt, ValidatorId, SessionIndex, DisputeStatement,
+};
+
 pub use polkadot_parachain::primitives::BlockData;
 
 pub mod approval;
@@ -273,3 +283,125 @@ pub fn maybe_compress_pov(pov: PoV) -> PoV {
 	let pov = PoV { block_data: BlockData(raw) };
 	pov
 }
+
+/// Tracked votes on candidates, for the purposes of dispute resolution.
+#[derive(Debug, Clone)]
+pub struct CandidateVotes {
+	/// The receipt of the candidate itself.
+	pub candidate_receipt: CandidateReceipt,
+	/// Votes of validity, sorted by validator index.
+	pub valid: Vec<(ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature)>,
+	/// Votes of invalidity, sorted by validator index.
+	pub invalid: Vec<(InvalidDisputeStatementKind, ValidatorIndex, ValidatorSignature)>,
+}
+
+impl CandidateVotes {
+	/// Get the set of all validators who have votes in the set, ascending.
+	pub fn voted_indices(&self) -> Vec<ValidatorIndex> {
+		let mut v: Vec<_> = self.valid.iter().map(|x| x.1).chain(
+			self.invalid.iter().map(|x| x.1)
+		).collect();
+
+		v.sort();
+		v.dedup();
+
+		v
+	}
+}
+
+
+/// A checked dispute statement from an associated validator.
+#[derive(Debug, Clone)]
+pub struct SignedDisputeStatement {
+	dispute_statement: DisputeStatement,
+	candidate_hash: CandidateHash,
+	validator_public: ValidatorId,
+	validator_signature: ValidatorSignature,
+	session_index: SessionIndex,
+}
+
+impl SignedDisputeStatement {
+	/// Create a new `SignedDisputeStatement`, which is only possible by checking the signature.
+	pub fn new_checked(
+		dispute_statement: DisputeStatement,
+		candidate_hash: CandidateHash,
+		session_index: SessionIndex,
+		validator_public: ValidatorId,
+		validator_signature: ValidatorSignature,
+	) -> Result<Self, ()> {
+		dispute_statement.check_signature(
+			&validator_public,
+			candidate_hash,
+			session_index,
+			&validator_signature,
+		).map(|_| SignedDisputeStatement {
+			dispute_statement,
+			candidate_hash,
+			validator_public,
+			validator_signature,
+			session_index,
+		})
+	}
+
+	/// Sign this statement with the given keystore and key. Pass `valid = true` to
+	/// indicate validity of the candidate, and `valid = false` to indicate invalidity.
+	pub async fn sign_explicit(
+		keystore: &SyncCryptoStorePtr,
+		valid: bool,
+		candidate_hash: CandidateHash,
+		session_index: SessionIndex,
+		validator_public: ValidatorId,
+	) -> Result<Option<Self>, KeystoreError> {
+		let dispute_statement = if valid {
+			DisputeStatement::Valid(ValidDisputeStatementKind::Explicit)
+		} else {
+			DisputeStatement::Invalid(InvalidDisputeStatementKind::Explicit)
+		};
+
+		let data = dispute_statement.payload_data(candidate_hash, session_index);
+		let signature = CryptoStore::sign_with(
+			&**keystore,
+			ValidatorId::ID,
+			&validator_public.clone().into(),
+			&data,
+		).await?;
+
+		let signature = match signature {
+			Some(sig) => sig.try_into().map_err(|_| KeystoreError::KeyNotSupported(ValidatorId::ID))?,
+			None => return Ok(None),
+		};
+
+		Ok(Some(Self {
+			dispute_statement,
+			candidate_hash,
+			validator_public,
+			validator_signature: signature,
+			session_index,
+		}))
+	}
+
+	/// Access the underlying dispute statement
+	pub fn statement(&self) -> &DisputeStatement {
+		&self.dispute_statement
+	}
+
+	/// Access the underlying candidate hash.
+	pub fn candidate_hash(&self) -> &CandidateHash {
+		&self.candidate_hash
+	}
+
+	/// Access the underlying validator public key.
+	pub fn validator_public(&self) -> &ValidatorId {
+		&self.validator_public
+	}
+
+	/// Access the underlying validator signature.
+	pub fn validator_signature(&self) -> &ValidatorSignature {
+		&self.validator_signature
+	}
+
+	/// Access the underlying session index.
+	pub fn session_index(&self) -> SessionIndex {
+		self.session_index
+	}
+}
diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs
index 352634767da..fac5cdc7b37 100644
--- a/polkadot/node/subsystem-util/src/lib.rs
+++ b/polkadot/node/subsystem-util/src/lib.rs
@@ -69,6 +69,8 @@ pub mod reexports {
 
 /// Convenient and efficient runtime info access.
 pub mod runtime;
+/// A rolling session window cache.
+pub mod rolling_session_window;
 
 mod error_handling;
 
diff --git a/polkadot/node/subsystem-util/src/rolling_session_window.rs b/polkadot/node/subsystem-util/src/rolling_session_window.rs
new file mode 100644
index 00000000000..136b44eaea2
--- /dev/null
+++ b/polkadot/node/subsystem-util/src/rolling_session_window.rs
@@ -0,0 +1,635 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! A rolling window of sessions and cached session info, updated by the state of newly imported blocks.
+//!
+//! This is useful for consensus components which need to stay up-to-date about recent sessions but don't
+//! care about the state of particular blocks.
+
+use polkadot_primitives::v1::{Hash, Header, SessionInfo, SessionIndex};
+use polkadot_node_subsystem::{
+	SubsystemContext,
+	messages::{RuntimeApiMessage, RuntimeApiRequest},
+	errors::RuntimeApiError,
+};
+use futures::channel::oneshot;
+
+/// Sessions unavailable in state to cache.
+#[derive(Debug)]
+pub enum SessionsUnavailableKind {
+	/// Runtime API subsystem was unavailable.
+	RuntimeApiUnavailable(oneshot::Canceled),
+	/// The runtime API itself returned an error.
+	RuntimeApi(RuntimeApiError),
+	/// Missing session info from runtime API.
+	Missing,
+}
+
+/// Information about the sessions being fetched.
+#[derive(Debug)]
+pub struct SessionsUnavailableInfo {
+	/// The desired window start.
+	pub window_start: SessionIndex,
+	/// The desired window end.
+	pub window_end: SessionIndex,
+	/// The block hash whose state the sessions were meant to be drawn from.
+	pub block_hash: Hash,
+}
+
+/// Sessions were unavailable to fetch from the state for some reason.
+#[derive(Debug)]
+pub struct SessionsUnavailable {
+	/// The error kind.
+	kind: SessionsUnavailableKind,
+	/// The info about the session window, if any.
+	info: Option<SessionsUnavailableInfo>,
+}
+
+/// An indicated update of the rolling session window.
+#[derive(Debug, PartialEq, Clone)]
+pub enum SessionWindowUpdate {
+	/// The session window was just initialized to the current values.
+	Initialized {
+		/// The start of the window (inclusive).
+		window_start: SessionIndex,
+		/// The end of the window (inclusive).
+		window_end: SessionIndex,
+	},
+	/// The session window was just advanced from one range to a new one.
+	Advanced {
+		/// The previous start of the window (inclusive).
+		prev_window_start: SessionIndex,
+		/// The previous end of the window (inclusive).
+		prev_window_end: SessionIndex,
+		/// The new start of the window (inclusive).
+		new_window_start: SessionIndex,
+		/// The new end of the window (inclusive).
+		new_window_end: SessionIndex,
+	},
+	/// The session window was unchanged.
+	Unchanged,
+}
+
+/// A rolling window of sessions and cached session info.
+#[derive(Default)]
+pub struct RollingSessionWindow {
+	earliest_session: Option<SessionIndex>,
+	session_info: Vec<SessionInfo>,
+	window_size: SessionIndex,
+}
+
+impl RollingSessionWindow {
+	/// Initialize a new session info cache with the given window size.
+	pub fn new(window_size: SessionIndex) -> Self {
+		RollingSessionWindow {
+			earliest_session: None,
+			session_info: Vec::new(),
+			window_size,
+		}
+	}
+
+	/// Initialize a new session info cache with the given window size and
+	/// initial data.
+	pub fn with_session_info(
+		window_size: SessionIndex,
+		earliest_session: SessionIndex,
+		session_info: Vec<SessionInfo>,
+	) -> Self {
+		RollingSessionWindow {
+			earliest_session: Some(earliest_session),
+			session_info,
+			window_size,
+		}
+	}
+
+	/// Access the session info for the given session index, if stored within the window.
+	pub fn session_info(&self, index: SessionIndex) -> Option<&SessionInfo> {
+		self.earliest_session.and_then(|earliest| {
+			if index < earliest {
+				None
+			} else {
+				self.session_info.get((index - earliest) as usize)
+			}
+		})
+
+	}
+
+	/// Access the index of the earliest session, if the window is not empty.
+	pub fn earliest_session(&self) -> Option<SessionIndex> {
+		self.earliest_session.clone()
+	}
+
+	/// Access the index of the latest session, if the window is not empty.
+	pub fn latest_session(&self) -> Option<SessionIndex> {
+		self.earliest_session
+			.map(|earliest| earliest + (self.session_info.len() as SessionIndex).saturating_sub(1))
+	}
+
+	/// When inspecting a new import notification, updates the session info cache to match
+	/// the session of the imported block.
+	///
+	/// this only needs to be called on heads where we are directly notified about import, as sessions do
+	/// not change often and import notifications are expected to be typically increasing in session number.
+	///
+	/// some backwards drift in session index is acceptable.
+	pub async fn cache_session_info_for_head(
+		&mut self,
+		ctx: &mut impl SubsystemContext,
+		block_hash: Hash,
+		block_header: &Header,
+	) -> Result<SessionWindowUpdate, SessionsUnavailable> {
+		if self.window_size == 0 { return Ok(SessionWindowUpdate::Unchanged) }
+
+		let session_index = {
+			let (s_tx, s_rx) = oneshot::channel();
+
+			// The genesis is guaranteed to be at the beginning of the session and its parent state
+			// is non-existent. Therefore if we're at the genesis, we request using its state and
+			// not the parent.
+			ctx.send_message(RuntimeApiMessage::Request(
+				if block_header.number == 0 { block_hash } else { block_header.parent_hash },
+				RuntimeApiRequest::SessionIndexForChild(s_tx),
+			).into()).await;
+
+			match s_rx.await {
+				Ok(Ok(s)) => s,
+				Ok(Err(e)) => return Err(SessionsUnavailable {
+					kind: SessionsUnavailableKind::RuntimeApi(e),
+					info: None,
+				}),
+				Err(e) => return Err(SessionsUnavailable {
+					kind: SessionsUnavailableKind::RuntimeApiUnavailable(e),
+					info: None,
+				}),
+			}
+		};
+
+		match self.earliest_session {
+			None => {
+				// First block processed on start-up.
+
+				let window_start = session_index.saturating_sub(self.window_size - 1);
+
+				match load_all_sessions(ctx, block_hash, window_start, session_index).await {
+					Err(kind) => {
+						Err(SessionsUnavailable {
+							kind,
+							info: Some(SessionsUnavailableInfo {
+								window_start,
+								window_end: session_index,
+								block_hash,
+							}),
+						})
+					},
+					Ok(s) => {
+						let update = SessionWindowUpdate::Initialized {
+							window_start,
+							window_end: session_index,
+						};
+
+						self.earliest_session = Some(window_start);
+						self.session_info = s;
+
+						Ok(update)
+					}
+				}
+			}
+			Some(old_window_start) => {
+				let latest = self.latest_session().expect("latest always exists if earliest does; qed");
+
+				// Either cached or ancient.
+				if session_index <= latest { return Ok(SessionWindowUpdate::Unchanged) }
+
+				let old_window_end = latest;
+
+				let window_start = session_index.saturating_sub(self.window_size - 1);
+
+				// keep some of the old window, if applicable.
+				let overlap_start = window_start.saturating_sub(old_window_start);
+
+				let fresh_start = if latest < window_start {
+					window_start
+				} else {
+					latest + 1
+				};
+
+				match load_all_sessions(ctx, block_hash, fresh_start, session_index).await {
+					Err(kind) => {
+						Err(SessionsUnavailable {
+							kind,
+							info: Some(SessionsUnavailableInfo {
+								window_start: latest +1,
+								window_end: session_index,
+								block_hash,
+							}),
+						})
+					},
+					Ok(s) => {
+						let update = SessionWindowUpdate::Advanced {
+							prev_window_start: old_window_start,
+							prev_window_end: old_window_end,
+							new_window_start: window_start,
+							new_window_end: session_index,
+						};
+
+						let outdated = std::cmp::min(overlap_start as usize, self.session_info.len());
+						self.session_info.drain(..outdated);
+						self.session_info.extend(s);
+						// we need to account for this case:
+						// window_start ................................... session_index
+						//              old_window_start ........... latest
+						let new_earliest = std::cmp::max(window_start, old_window_start);
+						self.earliest_session = Some(new_earliest);
+
+						Ok(update)
+					}
+				}
+			}
+		}
+	}
+}
+
+async fn load_all_sessions(
+	ctx: &mut impl SubsystemContext,
+	block_hash: Hash,
+	start: SessionIndex,
+	end_inclusive: SessionIndex,
+) -> Result<Vec<SessionInfo>, SessionsUnavailableKind> {
+	let mut v = Vec::new();
+	for i in start..=end_inclusive {
+		let (tx, rx)= oneshot::channel();
+		ctx.send_message(RuntimeApiMessage::Request(
+			block_hash,
+			RuntimeApiRequest::SessionInfo(i, tx),
+		).into()).await;
+
+		let session_info = match rx.await {
+			Ok(Ok(Some(s))) => s,
+			Ok(Ok(None)) => {
+				return Err(SessionsUnavailableKind::Missing);
+			}
+			Ok(Err(e)) => return Err(SessionsUnavailableKind::RuntimeApi(e)),
+			Err(canceled) => return Err(SessionsUnavailableKind::RuntimeApiUnavailable(canceled)),
+		};
+
+		v.push(session_info);
+	}
+
+	Ok(v)
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use polkadot_node_subsystem_test_helpers::make_subsystem_context;
+	use polkadot_node_subsystem::messages::AllMessages;
+	use sp_core::testing::TaskExecutor;
+	use assert_matches::assert_matches;
+
+	const TEST_WINDOW_SIZE: SessionIndex = 6;
+
+	fn dummy_session_info(index: SessionIndex) -> SessionInfo {
+		SessionInfo {
+			validators: Vec::new(),
+			discovery_keys: Vec::new(),
+			assignment_keys: Vec::new(),
+			validator_groups: Vec::new(),
+			n_cores: index as _,
+			zeroth_delay_tranche_width: index as _,
+			relay_vrf_modulo_samples: index as _,
+			n_delay_tranches: index as _,
+			no_show_slots: index as _,
+			needed_approvals: index as _,
+		}
+	}
+
+	fn cache_session_info_test(
+		expected_start_session: SessionIndex,
+		session: SessionIndex,
+		mut window: RollingSessionWindow,
+		expect_requests_from: SessionIndex,
+	) {
+		let header = Header {
+			digest: Default::default(),
+			extrinsics_root: Default::default(),
+			number: 5,
+			state_root: Default::default(),
+			parent_hash: Default::default(),
+		};
+
+		let pool = TaskExecutor::new();
+		let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
+
+		let hash = header.hash();
+
+		let test_fut = {
+			let header = header.clone();
+			Box::pin(async move {
+				window.cache_session_info_for_head(
+					&mut ctx,
+					hash,
+					&header,
+				).await.unwrap();
+
+				assert_eq!(window.earliest_session, Some(expected_start_session));
+				assert_eq!(
+					window.session_info,
+					(expected_start_session..=session).map(dummy_session_info).collect::<Vec<_>>(),
+				);
+			})
+		};
+
+		let aux_fut = Box::pin(async move {
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					h,
+					RuntimeApiRequest::SessionIndexForChild(s_tx),
+				)) => {
+					assert_eq!(h, header.parent_hash);
+					let _ = s_tx.send(Ok(session));
+				}
+			);
+
+			for i in expect_requests_from..=session {
+				assert_matches!(
+					handle.recv().await,
+					AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+						h,
+						RuntimeApiRequest::SessionInfo(j, s_tx),
+					)) => {
+						assert_eq!(h, hash);
+						assert_eq!(i, j);
+						let _ = s_tx.send(Ok(Some(dummy_session_info(i))));
+					}
+				);
+			}
+		});
+
+		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
+	}
+
+	#[test]
+	fn cache_session_info_first_early() {
+		cache_session_info_test(
+			0,
+			1,
+			RollingSessionWindow::new(TEST_WINDOW_SIZE),
+			0,
+		);
+	}
+
+	#[test]
+	fn cache_session_info_does_not_underflow() {
+		let window = RollingSessionWindow {
+			earliest_session: Some(1),
+			session_info: vec![dummy_session_info(1)],
+			window_size: TEST_WINDOW_SIZE,
+		};
+
+		cache_session_info_test(
+			1,
+			2,
+			window,
+			2,
+		);
+	}
+
+	#[test]
+	fn cache_session_info_first_late() {
+		cache_session_info_test(
+			(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
+			100,
+			RollingSessionWindow::new(TEST_WINDOW_SIZE),
+			(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
+		);
+	}
+
+	#[test]
+	fn cache_session_info_jump() {
+		let window = RollingSessionWindow {
+			earliest_session: Some(50),
+			session_info: vec![dummy_session_info(50), dummy_session_info(51), dummy_session_info(52)],
+			window_size: TEST_WINDOW_SIZE,
+		};
+
+		cache_session_info_test(
+			(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
+			100,
+			window,
+			(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
+		);
+	}
+
+	#[test]
+	fn cache_session_info_roll_full() {
+		let start = 99 - (TEST_WINDOW_SIZE - 1);
+		let window = RollingSessionWindow {
+			earliest_session: Some(start),
+			session_info: (start..=99).map(dummy_session_info).collect(),
+			window_size: TEST_WINDOW_SIZE,
+		};
+
+		cache_session_info_test(
+			(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
+			100,
+			window,
+			100, // should only make one request.
+		);
+	}
+
+	#[test]
+	fn cache_session_info_roll_many_full() {
+		let start = 97 - (TEST_WINDOW_SIZE - 1);
+		let window = RollingSessionWindow {
+			earliest_session: Some(start),
+			session_info: (start..=97).map(dummy_session_info).collect(),
+			window_size: TEST_WINDOW_SIZE,
+		};
+
+		cache_session_info_test(
+			(100 as SessionIndex).saturating_sub(TEST_WINDOW_SIZE - 1),
+			100,
+			window,
+			98,
+		);
+	}
+
+	#[test]
+	fn cache_session_info_roll_early() {
+		let start = 0;
+		let window = RollingSessionWindow {
+			earliest_session: Some(start),
+			session_info: (0..=1).map(dummy_session_info).collect(),
+			window_size: TEST_WINDOW_SIZE,
+		};
+
+		cache_session_info_test(
+			0,
+			2,
+			window,
+			2, // should only make one request.
+		);
+	}
+
+	#[test]
+	fn cache_session_info_roll_many_early() {
+		let start = 0;
+		let window = RollingSessionWindow {
+			earliest_session: Some(start),
+			session_info: (0..=1).map(dummy_session_info).collect(),
+			window_size: TEST_WINDOW_SIZE,
+		};
+
+		cache_session_info_test(
+			0,
+			3,
+			window,
+			2,
+		);
+	}
+
+	#[test]
+	fn any_session_unavailable_for_caching_means_no_change() {
+		let session: SessionIndex = 6;
+		let start_session = session.saturating_sub(TEST_WINDOW_SIZE - 1);
+
+		let header = Header {
+			digest: Default::default(),
+			extrinsics_root: Default::default(),
+			number: 5,
+			state_root: Default::default(),
+			parent_hash: Default::default(),
+		};
+
+		let pool = TaskExecutor::new();
+		let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
+
+		let mut window = RollingSessionWindow::new(TEST_WINDOW_SIZE);
+		let hash = header.hash();
+
+		let test_fut = {
+			let header = header.clone();
+			Box::pin(async move {
+				let res = window.cache_session_info_for_head(
+					&mut ctx,
+					hash,
+					&header,
+				).await;
+
+				assert!(res.is_err());
+			})
+		};
+
+		let aux_fut = Box::pin(async move {
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					h,
+					RuntimeApiRequest::SessionIndexForChild(s_tx),
+				)) => {
+					assert_eq!(h, header.parent_hash);
+					let _ = s_tx.send(Ok(session));
+				}
+			);
+
+			for i in start_session..=session {
+				assert_matches!(
+					handle.recv().await,
+					AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+						h,
+						RuntimeApiRequest::SessionInfo(j, s_tx),
+					)) => {
+						assert_eq!(h, hash);
+						assert_eq!(i, j);
+
+						let _ = s_tx.send(Ok(if i == session {
+							None
+						} else {
+							Some(dummy_session_info(i))
+						}));
+					}
+				);
+			}
+		});
+
+		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
+	}
+
+	#[test]
+	fn request_session_info_for_genesis() {
+		let session: SessionIndex = 0;
+
+		let header = Header {
+			digest: Default::default(),
+			extrinsics_root: Default::default(),
+			number: 0,
+			state_root: Default::default(),
+			parent_hash: Default::default(),
+		};
+
+		let pool = TaskExecutor::new();
+		let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
+
+		let mut window = RollingSessionWindow::new(TEST_WINDOW_SIZE);
+		let hash = header.hash();
+
+		let test_fut = {
+			let header = header.clone();
+			Box::pin(async move {
+				window.cache_session_info_for_head(
+					&mut ctx,
+					hash,
+					&header,
+				).await.unwrap();
+
+				assert_eq!(window.earliest_session, Some(session));
+				assert_eq!(
+					window.session_info,
+					vec![dummy_session_info(session)],
+				);
+			})
+		};
+
+		let aux_fut = Box::pin(async move {
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					h,
+					RuntimeApiRequest::SessionIndexForChild(s_tx),
+				)) => {
+					assert_eq!(h, hash);
+					let _ = s_tx.send(Ok(session));
+				}
+			);
+
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					h,
+					RuntimeApiRequest::SessionInfo(s, s_tx),
+				)) => {
+					assert_eq!(h, hash);
+					assert_eq!(s, session);
+
+					let _ = s_tx.send(Ok(Some(dummy_session_info(s))));
+				}
+			);
+		});
+
+		futures::executor::block_on(futures::future::join(test_fut, aux_fut));
+	}
+}
diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs
index c5da3caae8e..05833d1e673 100644
--- a/polkadot/node/subsystem/src/messages.rs
+++ b/polkadot/node/subsystem/src/messages.rs
@@ -36,8 +36,8 @@ use polkadot_node_network_protocol::{
 };
 use polkadot_node_primitives::{
 	approval::{BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote},
-	AvailableData, BabeEpoch, CollationGenerationConfig, ErasureChunk, PoV, SignedFullStatement,
-	ValidationResult,
+	AvailableData, BabeEpoch, CandidateVotes, CollationGenerationConfig, ErasureChunk, PoV,
+	SignedDisputeStatement, SignedFullStatement, ValidationResult,
 };
 use polkadot_primitives::v1::{
 	AuthorityDiscoveryId, BackedCandidate, BlockNumber, CandidateDescriptor, CandidateEvent,
@@ -188,6 +188,73 @@ impl BoundToRelayParent for CollatorProtocolMessage {
 	}
 }
 
+/// Messages received by the dispute coordinator subsystem.
+#[derive(Debug)]
+pub enum DisputeCoordinatorMessage {
+	/// Import a statement by a validator about a candidate.
+	///
+	/// The subsystem will silently discard ancient statements or sets of only dispute-specific statements for
+	/// candidates that are previously unknown to the subsystem. The former is simply because ancient
+	/// data is not relevant and the latter is as a DoS prevention mechanism. Both backing and approval
+	/// statements already undergo anti-DoS procedures in their respective subsystems, but statements
+	/// cast specifically for disputes are not necessarily relevant to any candidate the system is
+	/// already aware of and thus present a DoS vector. Our expectation is that nodes will notify each
+	/// other of disputes over the network by providing (at least) 2 conflicting statements, of which one is either
+	/// a backing or validation statement.
+	///
+	/// This does not do any checking of the message signature.
+	ImportStatements {
+		/// The hash of the candidate.
+		candidate_hash: CandidateHash,
+		/// The candidate receipt itself.
+		candidate_receipt: CandidateReceipt,
+		/// The session the candidate appears in.
+		session: SessionIndex,
+		/// Statements, with signatures checked, by validators participating in disputes.
+		///
+		/// The validator index passed alongside each statement should correspond to the index
+		/// of the validator in the set.
+		statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
+	},
+	/// Fetch a list of all active disputes that the coordinator is aware of.
+	ActiveDisputes(oneshot::Sender<Vec<(SessionIndex, CandidateHash)>>),
+	/// Get candidate votes for a candidate.
+	QueryCandidateVotes(SessionIndex, CandidateHash, oneshot::Sender<Option<CandidateVotes>>),
+	/// Sign and issue local dispute votes. A value of `true` indicates validity, and `false` invalidity.
+	IssueLocalStatement(SessionIndex, CandidateHash, CandidateReceipt, bool),
+	/// Determine the highest undisputed block within the given chain, based on where candidates
+	/// were included. If even the base block should not be finalized due to a dispute,
+	/// then `None` should be returned on the channel.
+	///
+	/// The block descriptions begin counting upwards from the block after the given `base_number`. The `base_number`
+	/// is typically the number of the last finalized block but may be slightly higher. This block
+	/// is inevitably going to be finalized so it is not accounted for by this function.
+	DetermineUndisputedChain {
+		/// The number of the lowest possible block to vote on.
+		base_number: BlockNumber,
+		/// Descriptions of all the blocks counting upwards from the block after the base number
+		block_descriptions: Vec<(Hash, SessionIndex, Vec<CandidateHash>)>,
+		/// A response channel - `None` to vote on base, `Some` to vote higher.
+		tx: oneshot::Sender<Option<(BlockNumber, Hash)>>,
+	}
+}
+
+/// Messages received by the dispute participation subsystem.
+#[derive(Debug)]
+pub enum DisputeParticipationMessage {
+	/// Validate a candidate for the purposes of participating in a dispute.
+	Participate {
+		/// The hash of the candidate
+		candidate_hash: CandidateHash,
+		/// The candidate receipt itself.
+		candidate_receipt: CandidateReceipt,
+		/// The session the candidate appears in.
+		session: SessionIndex,
+		/// The indices of validators who have already voted on this candidate.
+		voted_indices: Vec<ValidatorIndex>,
+	}
+}
+
 /// Messages received by the network bridge subsystem.
 #[derive(Debug)]
 pub enum NetworkBridgeMessage {
@@ -703,6 +770,12 @@ pub enum AllMessages {
 	/// Message for the Gossip Support subsystem.
 	#[skip]
 	GossipSupport(GossipSupportMessage),
+	/// Message for the dispute coordinator subsystem.
+	#[skip]
+	DisputeCoordinator(DisputeCoordinatorMessage),
+	/// Message for the dispute participation subsystem.
+	#[skip]
+	DisputeParticipation(DisputeParticipationMessage),
 }
 
 impl From<IncomingRequest<req_res_v1::PoVFetchingRequest>> for AvailabilityDistributionMessage {
diff --git a/polkadot/primitives/src/v0.rs b/polkadot/primitives/src/v0.rs
index ab7c3fe18b8..917bda2aa72 100644
--- a/polkadot/primitives/src/v0.rs
+++ b/polkadot/primitives/src/v0.rs
@@ -672,6 +672,14 @@ pub enum CompactStatement {
 	Valid(CandidateHash),
 }
 
+impl CompactStatement {
+	/// Yields the payload used for validator signatures on this kind
+	/// of statement.
+	pub fn signing_payload(&self, context: &SigningContext) -> Vec<u8> {
+		(self, context).encode()
+	}
+}
+
 // Inner helper for codec on `CompactStatement`.
 #[derive(Encode, Decode)]
 enum CompactStatementInner {
diff --git a/polkadot/primitives/src/v1/mod.rs b/polkadot/primitives/src/v1/mod.rs
index dc1ec4294bd..dbc4a7ceabc 100644
--- a/polkadot/primitives/src/v1/mod.rs
+++ b/polkadot/primitives/src/v1/mod.rs
@@ -842,6 +842,22 @@ pub struct SessionInfo {
 	pub needed_approvals: u32,
 }
 
+/// A vote of approval on a candidate.
+#[derive(Clone, RuntimeDebug)]
+pub struct ApprovalVote(pub CandidateHash);
+
+impl ApprovalVote {
+	/// Yields the signing payload for this approval vote.
+	pub fn signing_payload(
+		&self,
+		session_index: SessionIndex,
+	) -> Vec<u8> {
+		const MAGIC: [u8; 4] = *b"APPR";
+
+		(MAGIC, &self.0, session_index).encode()
+	}
+}
+
 sp_api::decl_runtime_apis! {
 	/// The API for querying the state of parachains on-chain.
 	pub trait ParachainHost<H: Decode = Hash, N: Encode + Decode = BlockNumber> {
@@ -1064,6 +1080,60 @@ pub enum DisputeStatement {
 	Invalid(InvalidDisputeStatementKind),
 }
 
+impl DisputeStatement {
+	/// Get the payload data for this type of dispute statement.
+	pub fn payload_data(&self, candidate_hash: CandidateHash, session: SessionIndex) -> Vec<u8> {
+		 match *self {
+			DisputeStatement::Valid(ValidDisputeStatementKind::Explicit) => {
+				ExplicitDisputeStatement {
+					valid: true,
+					candidate_hash,
+					session,
+				}.signing_payload()
+			},
+			DisputeStatement::Valid(ValidDisputeStatementKind::BackingSeconded(inclusion_parent)) => {
+				CompactStatement::Seconded(candidate_hash).signing_payload(&SigningContext {
+					session_index: session,
+					parent_hash: inclusion_parent,
+				})
+			},
+			DisputeStatement::Valid(ValidDisputeStatementKind::BackingValid(inclusion_parent)) => {
+				CompactStatement::Valid(candidate_hash).signing_payload(&SigningContext {
+					session_index: session,
+					parent_hash: inclusion_parent,
+				})
+			},
+			DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking) => {
+				ApprovalVote(candidate_hash).signing_payload(session)
+			},
+			DisputeStatement::Invalid(InvalidDisputeStatementKind::Explicit) => {
+				ExplicitDisputeStatement {
+					valid: false,
+					candidate_hash,
+					session,
+				}.signing_payload()
+			},
+		}
+	}
+
+	/// Check the signature on a dispute statement.
+	pub fn check_signature(
+		&self,
+		validator_public: &ValidatorId,
+		candidate_hash: CandidateHash,
+		session: SessionIndex,
+		validator_signature: &ValidatorSignature,
+	) -> Result<(), ()> {
+		let payload = self.payload_data(candidate_hash, session);
+
+		if validator_signature.verify(&payload[..] , &validator_public) {
+			Ok(())
+		} else {
+			Err(())
+		}
+	}
+}
+
 /// Different kinds of statements of validity on  a candidate.
 #[derive(Encode, Decode, Clone, PartialEq, RuntimeDebug)]
 pub enum ValidDisputeStatementKind {
@@ -1072,10 +1142,10 @@ pub enum ValidDisputeStatementKind {
 	Explicit,
 	/// A seconded statement on a candidate from the backing phase.
 	#[codec(index = 1)]
-	BackingSeconded,
+	BackingSeconded(Hash),
 	/// A valid statement on a candidate from the backing phase.
 	#[codec(index = 2)]
-	BackingValid,
+	BackingValid(Hash),
 	/// An approval vote from the approval checking phase.
 	#[codec(index = 3)]
 	ApprovalChecking,
@@ -1090,7 +1160,7 @@ pub enum InvalidDisputeStatementKind {
 }
 
 /// An explicit statement on a candidate issued as part of a dispute.
-#[derive(Encode, Decode, Clone, PartialEq, RuntimeDebug)]
+#[derive(Clone, PartialEq, RuntimeDebug)]
 pub struct ExplicitDisputeStatement {
 	/// Whether the candidate is valid
 	pub valid: bool,
@@ -1100,6 +1170,15 @@ pub struct ExplicitDisputeStatement {
 	pub session: SessionIndex,
 }
 
+impl ExplicitDisputeStatement {
+	/// Produce the payload used for signing this type of statement.
+	pub fn signing_payload(&self) -> Vec<u8> {
+		const MAGIC: [u8; 4] = *b"DISP";
+
+		(MAGIC, self.valid, self.candidate_hash, self.session).encode()
+	}
+}
+
 /// A set of statements about a specific candidate.
 #[derive(Encode, Decode, Clone, PartialEq, RuntimeDebug)]
 pub struct DisputeStatementSet {
@@ -1140,6 +1219,19 @@ pub struct InherentData<HDR: HeaderT = Header> {
 	pub parent_header: HDR,
 }
 
+/// The maximum number of validators `f` which may safely be faulty.
+///
+/// The total number of validators is `n = 3f + e` where `e in { 1, 2, 3 }`.
+pub fn byzantine_threshold(n: usize) -> usize {
+	n.saturating_sub(1) / 3
+}
+
+/// The supermajority threshold of validators which represents a subset
+/// guaranteed to have at least f+1 honest validators.
+pub fn supermajority_threshold(n: usize) -> usize {
+	n - byzantine_threshold(n)
+}
+
 #[cfg(test)]
 mod tests {
 	use super::*;
@@ -1190,4 +1282,28 @@ mod tests {
 			&Hash::repeat_byte(4).into(),
 		);
 	}
+
+	#[test]
+	fn test_byzantine_threshold() {
+		assert_eq!(byzantine_threshold(0), 0);
+		assert_eq!(byzantine_threshold(1), 0);
+		assert_eq!(byzantine_threshold(2), 0);
+		assert_eq!(byzantine_threshold(3), 0);
+		assert_eq!(byzantine_threshold(4), 1);
+		assert_eq!(byzantine_threshold(5), 1);
+		assert_eq!(byzantine_threshold(6), 1);
+		assert_eq!(byzantine_threshold(7), 2);
+	}
+
+	#[test]
+	fn test_supermajority_threshold() {
+		assert_eq!(supermajority_threshold(0), 0);
+		assert_eq!(supermajority_threshold(1), 1);
+		assert_eq!(supermajority_threshold(2), 2);
+		assert_eq!(supermajority_threshold(3), 3);
+		assert_eq!(supermajority_threshold(4), 3);
+		assert_eq!(supermajority_threshold(5), 4);
+		assert_eq!(supermajority_threshold(6), 5);
+		assert_eq!(supermajority_threshold(7), 5);
+	}
 }
diff --git a/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md b/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md
index f566bea4941..90cc75b2e27 100644
--- a/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md
+++ b/polkadot/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md
@@ -14,7 +14,7 @@ We use an underlying Key-Value database where we assume we have the following op
 We use this database to encode the following schema:
 
 ```rust
-(SessionIndex, "candidate-votes", CandidateHash) -> Option<CandidateVotes>
+("candidate-votes", SessionIndex, CandidateHash) -> Option<CandidateVotes>
 "active-disputes" -> ActiveDisputes
 "earliest-session" -> Option<SessionIndex>
 ```
@@ -55,8 +55,6 @@ Ephemeral in-memory state:
 ```rust
 struct State {
     keystore: KeyStore,
-    // An in-memory overlay used as a write-cache.
-    overlay: Map<(SessionIndex, CandidateReceipt), CandidateVotes>,
     highest_session: SessionIndex,
 }
 ```
@@ -67,14 +65,14 @@ For each leaf in the leaves update:
   * Fetch the session index for the child of the block with a [`RuntimeApiMessage::SessionIndexForChild`][RuntimeApiMessage].
   * If the session index is higher than `state.highest_session`:
     * update `state.highest_session`
-    * remove everything with session index less than `state.highest_session - DISPUTE_WINDOW` from the overlay and from the `"active-disputes"` in the DB.
+    * remove everything with session index less than `state.highest_session - DISPUTE_WINDOW` from the `"active-disputes"` in the DB.
     * Use `iter_with_prefix` to remove everything from `"earliest-session"` up to `state.highest_session - DISPUTE_WINDOW` from the DB under `"candidate-votes"`.
     * Update `"earliest-session"` to be equal to `state.highest_session - DISPUTE_WINDOW`.
   * For each new block, explicitly or implicitly, under the new leaf, scan for a dispute digest which indicates a rollback. If a rollback is detected, use the ChainApi subsystem to blacklist the chain.
 
 ### On `OverseerSignal::Conclude`
 
-Flush the overlay to DB and conclude.
+Exit gracefully.
 
 ### On `OverseerSignal::BlockFinalized`
 
@@ -84,11 +82,11 @@ Do nothing.
 
 * Deconstruct into parts `{ candidate_hash, candidate_receipt, session, statements }`.
 * If the session is earlier than `state.highest_session - DISPUTE_WINDOW`, return.
-* If there is an entry in the `state.overlay`, load that. Otherwise, load from underlying DB by querying `(session, "candidate-votes", candidate_hash). If that does not exist, create fresh with the given candidate receipt.
+* Load from underlying DB by querying `("candidate-votes", session, candidate_hash). If that does not exist, create fresh with the given candidate receipt.
 * If candidate votes is empty and the statements only contain dispute-specific votes, return.
 * Otherwise, if there is already an entry from the validator in the respective `valid` or `invalid` field of the `CandidateVotes`, return.
 * Add an entry to the respective `valid` or `invalid` list of the `CandidateVotes` for each statement in `statements`. 
-* Write the `CandidateVotes` to the `state.overlay`.
+* Write the `CandidateVotes` to the underyling DB.
 * If the both `valid` and `invalid` lists now have non-zero length where previously one or both had zero length, the candidate is now freshly disputed.
 * If freshly disputed, load `"active-disputes"` and add the candidate hash and session index. Also issue a [`DisputeParticipationMessage::Participate`][DisputeParticipationMessage].
 * If the dispute now has supermajority votes in the "valid" direction, according to the `SessionInfo` of the dispute candidate's session, remove from `"active-disputes"`.
@@ -101,8 +99,7 @@ Do nothing.
 
 ### On `DisputeCoordinatorMessage::QueryCandidateVotes`
 
-* Load from the `state.overlay`, and return the data if `Some`. 
-* Otherwise, load `"candidate-votes"` and return the data within or `None` if missing.
+* Load `"candidate-votes"` and return the data within or `None` if missing.
 
 ### On `DisputeCoordinatorMessage::IssueLocalStatement`
 
@@ -119,11 +116,6 @@ Do nothing.
   1. If there is a dispute, exit the loop.
 * For the highest index `i` reached in the `block_descriptions`, send `(base_number + i + 1, block_hash)` on the channel, unless `i` is 0, in which case `None` should be sent. The `block_hash` is determined by inspecting `block_descriptions[i]`.
 
-### Periodically
-
-* Flush the `state.overlay` to the DB, writing all entries within
-* Clear `state.overlay`.
-
 [DisputeTypes]: ../../types/disputes.md
 [DisputeStatement]: ../../types/disputes.md#disputestatement
 [DisputeCoordinatorMessage]: ../../types/overseer-protocol.md#dispute-coordinator-message
-- 
GitLab