From 8b1514862d40aa70a4b8156f114b2bcbf9ed6469 Mon Sep 17 00:00:00 2001
From: Chris Sosnin <48099298+slumber@users.noreply.github.com>
Date: Fri, 17 Dec 2021 14:07:28 +0300
Subject: [PATCH] Dispute coordinator: look for included candidates in
 non-finalized chain (#4508)

* Fetch ancestors of the activated leaf

* Replace param type

* Increase step size

* Request block numbers for ancestors

* Store activated leaves in lru cache

* Fix doc-comment

* Rework LRU usage

* Typos & formatting

* Handle errors better

* Introduce a size limit for the ancestry

* Return fatal error when fetching finalized block

* Update tests

* Add tests for ordering provider

* Better naming

* fix zombienet test, new version

* zombienet add debug

* debug zombienet

Co-authored-by: Javier Viola <javier@parity.io>
---
 polkadot/.gitlab-ci.yml                       |   5 +-
 polkadot/Cargo.lock                           |   1 +
 .../node/core/dispute-coordinator/Cargo.toml  |   1 +
 .../dispute-coordinator/src/real/error.rs     |   8 +-
 .../src/real/ordering/mod.rs                  | 155 +++++++-
 .../src/real/ordering/tests.rs                | 368 +++++++++++++-----
 .../dispute-coordinator/src/real/tests.rs     |  20 +-
 7 files changed, 434 insertions(+), 124 deletions(-)

diff --git a/polkadot/.gitlab-ci.yml b/polkadot/.gitlab-ci.yml
index 6d7977f2a7f..728c48c9332 100644
--- a/polkadot/.gitlab-ci.yml
+++ b/polkadot/.gitlab-ci.yml
@@ -27,7 +27,8 @@ variables:
   CI_IMAGE:                        "paritytech/ci-linux:production"
   DOCKER_OS:                       "debian:stretch"
   ARCH:                            "x86_64"
-  ZOMBIENET_IMAGE:                 "docker.io/paritytech/zombienet"
+ #ZOMBIENET_IMAGE:                 "docker.io/paritytech/zombienet"
+  ZOMBIENET_IMAGE:                 "docker.io/paritypr/zombienet:ec2fa96d"
   VAULT_SERVER_URL:                "https://vault.parity-mgmt-vault.parity.io"
   VAULT_AUTH_PATH:                 "gitlab-parity-io-jwt"
   VAULT_AUTH_ROLE:                 "cicd_gitlab_parity_${CI_PROJECT_NAME}"
@@ -666,7 +667,7 @@ zombienet-tests-malus-dispute-valid:
     - echo "${PARACHAINS_IMAGE_NAME} ${PARACHAINS_IMAGE_TAG}"
     - echo "${MALUS_IMAGE_NAME} ${MALUS_IMAGE_TAG}"
     - echo "${GH_DIR}"
-    - export DEBUG=zombie,zombie::network-node
+    - export DEBUG=zombie*
     - export ZOMBIENET_INTEGRATION_TEST_IMAGE=${PARACHAINS_IMAGE_NAME}:${PARACHAINS_IMAGE_TAG}
     - export MALUS_IMAGE=${MALUS_IMAGE_NAME}:${MALUS_IMAGE_TAG}
     - export COL_IMAGE=${COLLATOR_IMAGE_NAME}:${COLLATOR_IMAGE_TAG}
diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index b9f8eb0db8a..a6cf48f9ff5 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -6276,6 +6276,7 @@ dependencies = [
  "futures 0.3.18",
  "kvdb",
  "kvdb-memorydb",
+ "lru 0.7.0",
  "parity-scale-codec",
  "polkadot-node-primitives",
  "polkadot-node-subsystem",
diff --git a/polkadot/node/core/dispute-coordinator/Cargo.toml b/polkadot/node/core/dispute-coordinator/Cargo.toml
index d33f17c2a15..4225be761b1 100644
--- a/polkadot/node/core/dispute-coordinator/Cargo.toml
+++ b/polkadot/node/core/dispute-coordinator/Cargo.toml
@@ -10,6 +10,7 @@ tracing = "0.1.29"
 parity-scale-codec = "2"
 kvdb = "0.10.0"
 thiserror = "1.0.30"
+lru = "0.7.0"
 
 polkadot-primitives = { path = "../../../primitives" }
 polkadot-node-primitives = { path = "../../primitives" }
diff --git a/polkadot/node/core/dispute-coordinator/src/real/error.rs b/polkadot/node/core/dispute-coordinator/src/real/error.rs
index 86124bc5522..0a1b47fcc4b 100644
--- a/polkadot/node/core/dispute-coordinator/src/real/error.rs
+++ b/polkadot/node/core/dispute-coordinator/src/real/error.rs
@@ -86,11 +86,11 @@ pub enum Fatal {
 	#[error("Writing to database failed: {0}")]
 	DbWriteFailed(std::io::Error),
 
-	#[error("Oneshow for receiving block number from chain API got cancelled")]
-	CanceledBlockNumber,
+	#[error("Oneshot for receiving response from chain API got cancelled")]
+	ChainApiSenderDropped,
 
-	#[error("Retrieving block number from chain API failed with error: {0}")]
-	ChainApiBlockNumber(ChainApiError),
+	#[error("Retrieving response from chain API unexpectedly failed with error: {0}")]
+	ChainApi(#[from] ChainApiError),
 }
 
 #[derive(Debug, thiserror::Error)]
diff --git a/polkadot/node/core/dispute-coordinator/src/real/ordering/mod.rs b/polkadot/node/core/dispute-coordinator/src/real/ordering/mod.rs
index 08e79ed9875..bfd6298ddf5 100644
--- a/polkadot/node/core/dispute-coordinator/src/real/ordering/mod.rs
+++ b/polkadot/node/core/dispute-coordinator/src/real/ordering/mod.rs
@@ -20,9 +20,10 @@ use std::{
 };
 
 use futures::channel::oneshot;
+use lru::LruCache;
 
 use polkadot_node_subsystem::{
-	messages::ChainApiMessage, ActivatedLeaf, ActiveLeavesUpdate, SubsystemSender,
+	messages::ChainApiMessage, ActivatedLeaf, ActiveLeavesUpdate, ChainApiError, SubsystemSender,
 };
 use polkadot_node_subsystem_util::runtime::get_candidate_events;
 use polkadot_primitives::v1::{BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash};
@@ -35,6 +36,8 @@ use super::{
 #[cfg(test)]
 mod tests;
 
+const LRU_OBSERVED_BLOCKS_CAPACITY: usize = 20;
+
 /// Provider of `CandidateComparator` for candidates.
 pub struct OrderingProvider {
 	/// All candidates we have seen included, which not yet have been finalized.
@@ -43,6 +46,10 @@ pub struct OrderingProvider {
 	///
 	/// We need this to clean up `included_candidates` on `ActiveLeavesUpdate`.
 	candidates_by_block_number: BTreeMap<BlockNumber, HashSet<CandidateHash>>,
+	/// Latest relay blocks observed by the provider. We assume that ancestors of
+	/// cached blocks are already processed, i.e. we have saved corresponding
+	/// included candidates.
+	last_observed_blocks: LruCache<Hash, ()>,
 }
 
 /// `Comparator` for ordering of disputes for candidates.
@@ -119,6 +126,11 @@ impl CandidateComparator {
 }
 
 impl OrderingProvider {
+	/// Limits the number of ancestors received for a single request.
+	pub(crate) const ANCESTRY_CHUNK_SIZE: usize = 10;
+	/// Limits the overall number of ancestors walked through for a given head.
+	pub(crate) const ANCESTRY_SIZE_LIMIT: usize = 1000;
+
 	/// Create a properly initialized `OrderingProvider`.
 	pub async fn new<Sender: SubsystemSender>(
 		sender: &mut Sender,
@@ -127,6 +139,7 @@ impl OrderingProvider {
 		let mut s = Self {
 			included_candidates: HashSet::new(),
 			candidates_by_block_number: BTreeMap::new(),
+			last_observed_blocks: LruCache::new(LRU_OBSERVED_BLOCKS_CAPACITY),
 		};
 		let update =
 			ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() };
@@ -171,22 +184,44 @@ impl OrderingProvider {
 		update: &ActiveLeavesUpdate,
 	) -> Result<()> {
 		if let Some(activated) = update.activated.as_ref() {
-			// Get included events:
-			let included = get_candidate_events(sender, activated.hash)
-				.await?
-				.into_iter()
-				.filter_map(|ev| match ev {
-					CandidateEvent::CandidateIncluded(receipt, _, _, _) => Some(receipt),
-					_ => None,
+			// Fetch ancestors of the activated leaf.
+			let ancestors = self
+				.get_block_ancestors(sender, activated.hash, activated.number)
+				.await
+				.unwrap_or_else(|err| {
+					tracing::debug!(
+						target: LOG_TARGET,
+						activated_leaf = ?activated,
+						"Skipping leaf ancestors due to an error: {}",
+						err
+					);
+					Vec::new()
 				});
-			for receipt in included {
-				let candidate_hash = receipt.hash();
-				self.included_candidates.insert(candidate_hash);
-				self.candidates_by_block_number
-					.entry(activated.number)
-					.or_default()
-					.insert(candidate_hash);
+			// Ancestors block numbers are consecutive in the descending order.
+			let earliest_block_number = activated.number - ancestors.len() as u32;
+			let block_numbers = (earliest_block_number..=activated.number).rev();
+
+			let block_hashes = std::iter::once(activated.hash).chain(ancestors);
+			for (block_num, block_hash) in block_numbers.zip(block_hashes) {
+				// Get included events:
+				let included = get_candidate_events(sender, block_hash)
+					.await?
+					.into_iter()
+					.filter_map(|ev| match ev {
+						CandidateEvent::CandidateIncluded(receipt, _, _, _) => Some(receipt),
+						_ => None,
+					});
+				for receipt in included {
+					let candidate_hash = receipt.hash();
+					self.included_candidates.insert(candidate_hash);
+					self.candidates_by_block_number
+						.entry(block_num)
+						.or_default()
+						.insert(candidate_hash);
+				}
 			}
+
+			self.last_observed_blocks.put(activated.hash, ());
 		}
 
 		Ok(())
@@ -205,6 +240,87 @@ impl OrderingProvider {
 			self.included_candidates.remove(&finalized_candidate);
 		}
 	}
+
+	/// Returns ancestors of `head` in the descending order, stopping
+	/// either at the block present in cache or the latest finalized block.
+	///
+	/// Suited specifically for querying non-finalized chains, thus
+	/// doesn't rely on block numbers.
+	///
+	/// Both `head` and last are **not** included in the result.
+	async fn get_block_ancestors<Sender: SubsystemSender>(
+		&mut self,
+		sender: &mut Sender,
+		mut head: Hash,
+		mut head_number: BlockNumber,
+	) -> Result<Vec<Hash>> {
+		let mut ancestors = Vec::new();
+
+		if self.last_observed_blocks.get(&head).is_some() {
+			return Ok(ancestors)
+		}
+
+		let finalized_block_number = get_finalized_block_number(sender).await?;
+
+		loop {
+			let (tx, rx) = oneshot::channel();
+			let hashes = {
+				sender
+					.send_message(
+						ChainApiMessage::Ancestors {
+							hash: head,
+							k: Self::ANCESTRY_CHUNK_SIZE,
+							response_channel: tx,
+						}
+						.into(),
+					)
+					.await;
+
+				rx.await.or(Err(Fatal::ChainApiSenderDropped))?.map_err(Fatal::ChainApi)?
+			};
+
+			let earliest_block_number = head_number - hashes.len() as u32;
+			// The reversed order is parent, grandparent, etc. excluding the head.
+			let block_numbers = (earliest_block_number..head_number).rev();
+
+			for (block_number, hash) in block_numbers.zip(&hashes) {
+				// Return if we either met finalized/cached block or
+				// hit the size limit for the returned ancestry of head.
+				if self.last_observed_blocks.get(hash).is_some() ||
+					block_number <= finalized_block_number ||
+					ancestors.len() >= Self::ANCESTRY_SIZE_LIMIT
+				{
+					return Ok(ancestors)
+				}
+
+				ancestors.push(*hash);
+			}
+			match hashes.last() {
+				Some(last_hash) => {
+					head = *last_hash;
+					head_number = earliest_block_number;
+				},
+				None => break,
+			}
+		}
+		return Ok(ancestors)
+	}
+}
+
+async fn send_message_fatal<Sender, Response>(
+	sender: &mut Sender,
+	message: ChainApiMessage,
+	receiver: oneshot::Receiver<std::result::Result<Response, ChainApiError>>,
+) -> FatalResult<Response>
+where
+	Sender: SubsystemSender,
+{
+	sender.send_message(message.into()).await;
+
+	receiver
+		.await
+		.map_err(|_| Fatal::ChainApiSenderDropped)?
+		.map_err(Fatal::ChainApi)
 }
 
 async fn get_block_number(
@@ -212,9 +328,10 @@ async fn get_block_number(
 	relay_parent: Hash,
 ) -> FatalResult<Option<BlockNumber>> {
 	let (tx, rx) = oneshot::channel();
-	sender.send_message(ChainApiMessage::BlockNumber(relay_parent, tx).into()).await;
+	send_message_fatal(sender, ChainApiMessage::BlockNumber(relay_parent, tx), rx).await
+}
 
-	rx.await
-		.map_err(|_| Fatal::CanceledBlockNumber)?
-		.map_err(Fatal::ChainApiBlockNumber)
+async fn get_finalized_block_number(sender: &mut impl SubsystemSender) -> FatalResult<BlockNumber> {
+	let (number_tx, number_rx) = oneshot::channel();
+	send_message_fatal(sender, ChainApiMessage::FinalizedBlockNumber(number_tx), number_rx).await
 }
diff --git a/polkadot/node/core/dispute-coordinator/src/real/ordering/tests.rs b/polkadot/node/core/dispute-coordinator/src/real/ordering/tests.rs
index 8e10ac99836..6b9bac0da06 100644
--- a/polkadot/node/core/dispute-coordinator/src/real/ordering/tests.rs
+++ b/polkadot/node/core/dispute-coordinator/src/real/ordering/tests.rs
@@ -14,11 +14,11 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
-use std::sync::Arc;
+use std::{sync::Arc, time::Duration};
 
 use assert_matches::assert_matches;
 
-use futures::FutureExt;
+use futures::future::join;
 use parity_scale_codec::Encode;
 use sp_core::testing::TaskExecutor;
 
@@ -32,9 +32,9 @@ use polkadot_node_subsystem::{
 	ActivatedLeaf, ActiveLeavesUpdate, LeafStatus,
 };
 use polkadot_node_subsystem_test_helpers::{
-	make_subsystem_context, TestSubsystemContext, TestSubsystemContextHandle,
+	make_subsystem_context, TestSubsystemContext, TestSubsystemContextHandle, TestSubsystemSender,
 };
-use polkadot_node_subsystem_util::reexports::SubsystemContext;
+use polkadot_node_subsystem_util::{reexports::SubsystemContext, TimeoutExt};
 use polkadot_primitives::v1::{
 	BlakeTwo256, BlockNumber, CandidateDescriptor, CandidateEvent, CandidateReceipt, CoreIndex,
 	GroupIndex, Hash, HashT, HeadData,
@@ -44,105 +44,70 @@ use super::OrderingProvider;
 
 type VirtualOverseer = TestSubsystemContextHandle<DisputeCoordinatorMessage>;
 
+const OVERSEER_RECEIVE_TIMEOUT: Duration = Duration::from_secs(2);
+
+async fn overseer_recv(virtual_overseer: &mut VirtualOverseer) -> AllMessages {
+	virtual_overseer
+		.recv()
+		.timeout(OVERSEER_RECEIVE_TIMEOUT)
+		.await
+		.expect("overseer `recv` timed out")
+}
+
 struct TestState {
-	next_block_number: BlockNumber,
+	chain: Vec<Hash>,
 	ordering: OrderingProvider,
 	ctx: TestSubsystemContext<DisputeCoordinatorMessage, TaskExecutor>,
 }
 
 impl TestState {
-	async fn new() -> Self {
-		let (mut ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new());
-		let leaf = get_activated_leaf(1);
-		launch_virtual_overseer(&mut ctx, ctx_handle);
-		Self {
-			next_block_number: 2,
-			ordering: OrderingProvider::new(ctx.sender(), leaf).await.unwrap(),
-			ctx,
-		}
-	}
+	async fn new() -> (Self, VirtualOverseer) {
+		let (mut ctx, mut ctx_handle) = make_subsystem_context(TaskExecutor::new());
+		let leaf = get_activated_leaf(0);
+		let chain = vec![get_block_number_hash(0)];
 
-	/// Get a new leaf.
-	fn next_leaf(&mut self) -> ActivatedLeaf {
-		let r = get_activated_leaf(self.next_block_number);
-		self.next_block_number += 1;
-		r
-	}
+		let finalized_block_number = 0;
+		let expected_ancestry_len = 1;
+		let overseer_fut = overseer_process_active_leaves_update(
+			&mut ctx_handle,
+			&chain,
+			finalized_block_number,
+			expected_ancestry_len,
+		);
 
-	async fn process_active_leaves_update(&mut self) {
-		let update = self.next_leaf();
-		self.ordering
-			.process_active_leaves_update(
-				self.ctx.sender(),
-				&ActiveLeavesUpdate::start_work(update),
-			)
-			.await
-			.unwrap();
+		let ordering_provider =
+			join(OrderingProvider::new(ctx.sender(), leaf.clone()), overseer_fut)
+				.await
+				.0
+				.unwrap();
+
+		let test_state = Self { chain, ordering: ordering_provider, ctx };
+
+		(test_state, ctx_handle)
 	}
 }
 
-/// Simulate other subsystems:
-fn launch_virtual_overseer(ctx: &mut impl SubsystemContext, ctx_handle: VirtualOverseer) {
-	ctx.spawn(
-		"serve-active-leaves-update",
-		async move { virtual_overseer(ctx_handle).await }.boxed(),
-	)
-	.unwrap();
+fn next_block_number(chain: &[Hash]) -> BlockNumber {
+	chain.len() as u32
 }
 
-async fn virtual_overseer(mut ctx_handle: VirtualOverseer) {
-	let create_ev = |relay_parent: Hash| {
-		vec![CandidateEvent::CandidateIncluded(
-			make_candidate_receipt(relay_parent),
-			HeadData::default(),
-			CoreIndex::from(0),
-			GroupIndex::from(0),
-		)]
-	};
-
-	assert_matches!(
-		ctx_handle.recv().await,
-		AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				_,
-				RuntimeApiRequest::CandidateEvents(
-					tx,
-					)
-				)) => {
-			tx.send(Ok(Vec::new())).unwrap();
-		}
-	);
-	assert_matches!(
-		ctx_handle.recv().await,
-		AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				relay_parent,
-				RuntimeApiRequest::CandidateEvents(
-					tx,
-					)
-				)) => {
-			tx.send(Ok(create_ev(relay_parent))).unwrap();
-		}
-	);
-	assert_matches!(
-		ctx_handle.recv().await,
-		AllMessages::ChainApi(ChainApiMessage::BlockNumber(_relay_parent, tx)) => {
-			tx.send(Ok(Some(1))).unwrap();
-		}
-	);
-}
-
-/// Get a dummy `ActivatedLeaf` for a given block number.
-fn get_activated_leaf(n: BlockNumber) -> ActivatedLeaf {
-	ActivatedLeaf {
-		hash: get_block_number_hash(n),
-		number: n,
-		status: LeafStatus::Fresh,
-		span: Arc::new(jaeger::Span::Disabled),
-	}
+/// Get a new leaf.
+fn next_leaf(chain: &mut Vec<Hash>) -> ActivatedLeaf {
+	let next_block_number = next_block_number(chain);
+	let next_hash = get_block_number_hash(next_block_number);
+	chain.push(next_hash);
+	get_activated_leaf(next_block_number)
 }
 
-/// Get a dummy relay parent hash for dummy block number.
-fn get_block_number_hash(n: BlockNumber) -> Hash {
-	BlakeTwo256::hash(&n.encode())
+async fn process_active_leaves_update(
+	sender: &mut TestSubsystemSender,
+	ordering: &mut OrderingProvider,
+	update: ActivatedLeaf,
+) {
+	ordering
+		.process_active_leaves_update(sender, &ActiveLeavesUpdate::start_work(update))
+		.await
+		.unwrap();
 }
 
 fn make_candidate_receipt(relay_parent: Hash) -> CandidateReceipt {
@@ -162,22 +127,231 @@ fn make_candidate_receipt(relay_parent: Hash) -> CandidateReceipt {
 	candidate
 }
 
+/// Get a dummy `ActivatedLeaf` for a given block number.
+fn get_activated_leaf(n: BlockNumber) -> ActivatedLeaf {
+	ActivatedLeaf {
+		hash: get_block_number_hash(n),
+		number: n,
+		status: LeafStatus::Fresh,
+		span: Arc::new(jaeger::Span::Disabled),
+	}
+}
+
+/// Get a dummy relay parent hash for dummy block number.
+fn get_block_number_hash(n: BlockNumber) -> Hash {
+	BlakeTwo256::hash(&n.encode())
+}
+
+/// Get a dummy event that corresponds to candidate inclusion for the given block number.
+fn get_candidate_included_events(block_number: BlockNumber) -> Vec<CandidateEvent> {
+	vec![CandidateEvent::CandidateIncluded(
+		make_candidate_receipt(get_block_number_hash(block_number)),
+		HeadData::default(),
+		CoreIndex::from(0),
+		GroupIndex::from(0),
+	)]
+}
+
+async fn assert_candidate_events_request(virtual_overseer: &mut VirtualOverseer, chain: &[Hash]) {
+	assert_matches!(
+		overseer_recv(virtual_overseer).await,
+		AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+			hash,
+			RuntimeApiRequest::CandidateEvents(tx),
+		)) => {
+			let maybe_block_number = chain.iter().position(|h| *h == hash);
+			let response = maybe_block_number
+				.map(|num| get_candidate_included_events(num as u32))
+				.unwrap_or_default();
+			tx.send(Ok(response)).unwrap();
+		}
+	);
+}
+
+async fn assert_block_number_request(virtual_overseer: &mut VirtualOverseer, chain: &[Hash]) {
+	assert_matches!(
+		overseer_recv(virtual_overseer).await,
+		AllMessages::ChainApi(ChainApiMessage::BlockNumber(relay_parent, tx)) => {
+			let maybe_block_number =
+				chain.iter().position(|hash| *hash == relay_parent).map(|number| number as u32);
+			tx.send(Ok(maybe_block_number)).unwrap();
+		}
+	);
+}
+
+async fn assert_finalized_block_number_request(
+	virtual_overseer: &mut VirtualOverseer,
+	response: BlockNumber,
+) {
+	assert_matches!(
+		overseer_recv(virtual_overseer).await,
+		AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(tx)) => {
+			tx.send(Ok(response)).unwrap();
+		}
+	);
+}
+
+async fn assert_block_ancestors_request(virtual_overseer: &mut VirtualOverseer, chain: &[Hash]) {
+	assert_matches!(
+		overseer_recv(virtual_overseer).await,
+		AllMessages::ChainApi(ChainApiMessage::Ancestors { hash, k, response_channel }) => {
+			let maybe_block_position = chain.iter().position(|h| *h == hash);
+			let ancestors = maybe_block_position
+				.map(|idx| chain[..idx].iter().rev().take(k).copied().collect())
+				.unwrap_or_default();
+			response_channel.send(Ok(ancestors)).unwrap();
+		}
+	);
+}
+
+async fn overseer_process_active_leaves_update(
+	virtual_overseer: &mut VirtualOverseer,
+	chain: &[Hash],
+	finalized_block: BlockNumber,
+	expected_ancestry_len: usize,
+) {
+	// Before walking through ancestors provider requests latest finalized block number.
+	assert_finalized_block_number_request(virtual_overseer, finalized_block).await;
+	// Expect block ancestors requests with respect to the ancestry step.
+	for _ in (0..expected_ancestry_len).step_by(OrderingProvider::ANCESTRY_CHUNK_SIZE) {
+		assert_block_ancestors_request(virtual_overseer, chain).await;
+	}
+	// For each ancestry and the head return corresponding candidates inclusions.
+	for _ in 0..expected_ancestry_len {
+		assert_candidate_events_request(virtual_overseer, chain).await;
+	}
+}
+
 #[test]
 fn ordering_provider_provides_ordering_when_initialized() {
-	let candidate = make_candidate_receipt(get_block_number_hash(2));
+	let candidate = make_candidate_receipt(get_block_number_hash(1));
 	futures::executor::block_on(async {
-		let mut state = TestState::new().await;
-		let r = state
-			.ordering
-			.candidate_comparator(state.ctx.sender(), &candidate)
-			.await
-			.unwrap();
+		let (state, mut virtual_overseer) = TestState::new().await;
+
+		let TestState { mut chain, mut ordering, mut ctx } = state;
+
+		let r = ordering.candidate_comparator(ctx.sender(), &candidate).await.unwrap();
 		assert_matches!(r, None);
+
 		// After next active leaves update we should have a comparator:
-		state.process_active_leaves_update().await;
-		let r = state.ordering.candidate_comparator(state.ctx.sender(), &candidate).await;
+		let next_update = next_leaf(&mut chain);
+
+		let finalized_block_number = 0;
+		let expected_ancestry_len = 1;
+		let overseer_fut = overseer_process_active_leaves_update(
+			&mut virtual_overseer,
+			&chain,
+			finalized_block_number,
+			expected_ancestry_len,
+		);
+		join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
+			.await;
+
+		let r = join(
+			ordering.candidate_comparator(ctx.sender(), &candidate),
+			assert_block_number_request(&mut virtual_overseer, &chain),
+		)
+		.await
+		.0;
 		assert_matches!(r, Ok(Some(r2)) => {
 			assert_eq!(r2.relay_parent_block_number, 1);
 		});
 	});
 }
+
+#[test]
+fn ordering_provider_requests_candidates_of_leaf_ancestors() {
+	futures::executor::block_on(async {
+		// How many blocks should we skip before sending a leaf update.
+		const BLOCKS_TO_SKIP: usize = 30;
+
+		let (state, mut virtual_overseer) = TestState::new().await;
+
+		let TestState { mut chain, mut ordering, mut ctx } = state;
+
+		let next_update = (0..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap();
+
+		let finalized_block_number = 0;
+		let overseer_fut = overseer_process_active_leaves_update(
+			&mut virtual_overseer,
+			&chain,
+			finalized_block_number,
+			BLOCKS_TO_SKIP,
+		);
+		join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
+			.await;
+
+		let next_block_number = next_block_number(&chain);
+		for block_number in 1..next_block_number {
+			let candidate = make_candidate_receipt(get_block_number_hash(block_number));
+			let r = join(
+				ordering.candidate_comparator(ctx.sender(), &candidate),
+				assert_block_number_request(&mut virtual_overseer, &chain),
+			)
+			.await
+			.0;
+			assert_matches!(r, Ok(Some(r2)) => {
+				assert_eq!(r2.relay_parent_block_number, block_number);
+			});
+		}
+	});
+}
+
+#[test]
+fn ordering_provider_requests_candidates_of_non_cached_ancestors() {
+	futures::executor::block_on(async {
+		// How many blocks should we skip before sending a leaf update.
+		const BLOCKS_TO_SKIP: &[usize] = &[30, 15];
+
+		let (state, mut virtual_overseer) = TestState::new().await;
+
+		let TestState { mut chain, mut ordering, mut ctx } = state;
+
+		let next_update = (0..BLOCKS_TO_SKIP[0]).map(|_| next_leaf(&mut chain)).last().unwrap();
+
+		let finalized_block_number = 0;
+		let overseer_fut = overseer_process_active_leaves_update(
+			&mut virtual_overseer,
+			&chain,
+			finalized_block_number,
+			BLOCKS_TO_SKIP[0],
+		);
+		join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
+			.await;
+
+		// Send the second request and verify that we don't go past the cached block.
+		let next_update = (0..BLOCKS_TO_SKIP[1]).map(|_| next_leaf(&mut chain)).last().unwrap();
+		let overseer_fut = overseer_process_active_leaves_update(
+			&mut virtual_overseer,
+			&chain,
+			finalized_block_number,
+			BLOCKS_TO_SKIP[1],
+		);
+		join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
+			.await;
+	});
+}
+
+#[test]
+fn ordering_provider_requests_candidates_of_non_finalized_ancestors() {
+	futures::executor::block_on(async {
+		// How many blocks should we skip before sending a leaf update.
+		const BLOCKS_TO_SKIP: usize = 30;
+
+		let (state, mut virtual_overseer) = TestState::new().await;
+
+		let TestState { mut chain, mut ordering, mut ctx } = state;
+
+		let next_update = (0..BLOCKS_TO_SKIP).map(|_| next_leaf(&mut chain)).last().unwrap();
+
+		let finalized_block_number = 17;
+		let overseer_fut = overseer_process_active_leaves_update(
+			&mut virtual_overseer,
+			&chain,
+			finalized_block_number,
+			BLOCKS_TO_SKIP - finalized_block_number as usize, // Expect the provider not to go past finalized block.
+		);
+		join(process_active_leaves_update(ctx.sender(), &mut ordering, next_update), overseer_fut)
+			.await;
+	});
+}
diff --git a/polkadot/node/core/dispute-coordinator/src/real/tests.rs b/polkadot/node/core/dispute-coordinator/src/real/tests.rs
index 0303500cf50..62f81db2a88 100644
--- a/polkadot/node/core/dispute-coordinator/src/real/tests.rs
+++ b/polkadot/node/core/dispute-coordinator/src/real/tests.rs
@@ -34,9 +34,12 @@ use parity_scale_codec::Encode;
 
 use polkadot_node_primitives::SignedDisputeStatement;
 use polkadot_node_subsystem::{
-	messages::{DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult},
+	messages::{
+		ChainApiMessage, DisputeCoordinatorMessage, DisputeDistributionMessage,
+		ImportStatementsResult,
+	},
 	overseer::FromOverseer,
-	OverseerSignal,
+	ChainApiError, OverseerSignal,
 };
 use polkadot_node_subsystem_util::TimeoutExt;
 use sc_keystore::LocalKeystore;
@@ -236,6 +239,19 @@ impl TestState {
 			)
 		}
 
+		// Since the test harness sends active leaves update for each block
+		// consecutively, walking back for ancestors is not necessary. Sending
+		// an error to the subsystem will force-skip this procedure, the ordering
+		// provider will only request for candidates included in the leaf.
+		assert_matches!(
+			virtual_overseer.recv().await,
+			AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
+				tx
+			)) => {
+				tx.send(Err(ChainApiError::from(""))).unwrap();
+			}
+		);
+
 		assert_matches!(
 			virtual_overseer.recv().await,
 			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-- 
GitLab