From 0e2eb6d26c5c69f94174119020e3f3ba859dc35f Mon Sep 17 00:00:00 2001
From: Chris Sosnin <48099298+slumber@users.noreply.github.com>
Date: Wed, 26 Jan 2022 07:52:07 +0300
Subject: [PATCH] availability-distribution: look for leaf ancestors within the
 same session (#4596)

* availability-distribution: look for leaf ancestors

* Re-use subsystem-util

* Rework ancestry tasks scheduling

* Requester tests

* Improve readability for ancestors lookup
---
 .../availability-distribution/src/error.rs    |   8 +-
 .../src/requester/fetch_task/mod.rs           |   2 +-
 .../src/requester/mod.rs                      | 136 ++++++-
 .../src/requester/tests.rs                    | 336 ++++++++++++++++++
 polkadot/node/subsystem-types/src/lib.rs      |   2 +-
 5 files changed, 465 insertions(+), 19 deletions(-)
 create mode 100644 polkadot/node/network/availability-distribution/src/requester/tests.rs

diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs
index 764b2f64931..4ecec30ae57 100644
--- a/polkadot/node/network/availability-distribution/src/error.rs
+++ b/polkadot/node/network/availability-distribution/src/error.rs
@@ -24,7 +24,7 @@ use thiserror::Error;
 use futures::channel::oneshot;
 
 use polkadot_node_subsystem_util::runtime;
-use polkadot_subsystem::SubsystemError;
+use polkadot_subsystem::{ChainApiError, SubsystemError};
 
 use crate::LOG_TARGET;
 
@@ -63,6 +63,12 @@ pub enum Fatal {
 	/// Errors coming from runtime::Runtime.
 	#[error("Error while accessing runtime information: {0}")]
 	Runtime(#[from] runtime::Fatal),
+
+	#[error("Oneshot for receiving response from Chain API got cancelled")]
+	ChainApiSenderDropped(#[source] oneshot::Canceled),
+
+	#[error("Retrieving response from Chain API unexpectedly failed with error: {0}")]
+	ChainApi(#[from] ChainApiError),
 }
 
 /// Non-fatal errors of this subsystem.
diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs
index b3331b54a6d..a05ee0cd2d4 100644
--- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs
+++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs
@@ -64,7 +64,7 @@ pub struct FetchTask {
 	/// In other words, for which relay chain parents this candidate is considered live.
 	/// This is updated on every `ActiveLeavesUpdate` and enables us to know when we can safely
 	/// stop keeping track of that candidate/chunk.
-	live_in: HashSet<Hash>,
+	pub(crate) live_in: HashSet<Hash>,
 
 	/// We keep the task around in until `live_in` becomes empty, to make
 	/// sure we won't re-fetch an already fetched candidate.
diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs
index 6812fa69e95..2f9a9069cd4 100644
--- a/polkadot/node/network/availability-distribution/src/requester/mod.rs
+++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs
@@ -27,7 +27,7 @@ use std::{
 };
 
 use futures::{
-	channel::mpsc,
+	channel::{mpsc, oneshot},
 	task::{Context, Poll},
 	Stream,
 };
@@ -35,10 +35,15 @@ use futures::{
 use polkadot_node_subsystem_util::runtime::{get_occupied_cores, RuntimeInfo};
 use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore};
 use polkadot_subsystem::{
-	messages::AllMessages, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext,
+	messages::{AllMessages, ChainApiMessage},
+	ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext,
 };
 
-use super::{Metrics, LOG_TARGET};
+use super::{Metrics, Result, LOG_TARGET};
+use crate::error::Fatal;
+
+#[cfg(test)]
+mod tests;
 
 /// Cache for session information.
 mod session_cache;
@@ -75,6 +80,9 @@ pub struct Requester {
 }
 
 impl Requester {
+	/// How many ancestors of the leaf should we consider along with it.
+	pub(crate) const LEAF_ANCESTRY_LEN_WITHIN_SESSION: usize = 3;
+
 	/// Create a new `Requester`.
 	///
 	/// You must feed it with `ActiveLeavesUpdate` via `update_fetching_heads` and make it progress
@@ -83,6 +91,7 @@ impl Requester {
 		let (tx, rx) = mpsc::channel(1);
 		Requester { fetches: HashMap::new(), session_cache: SessionCache::new(), tx, rx, metrics }
 	}
+
 	/// Update heads that need availability distribution.
 	///
 	/// For all active heads we will be fetching our chunks for availability distribution.
@@ -91,43 +100,72 @@ impl Requester {
 		ctx: &mut Context,
 		runtime: &mut RuntimeInfo,
 		update: ActiveLeavesUpdate,
-	) -> super::Result<()>
+	) -> Result<()>
 	where
 		Context: SubsystemContext,
 	{
 		tracing::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
 		let ActiveLeavesUpdate { activated, deactivated } = update;
-		// Stale leaves happen after a reversion - we don't want to re-run availability there.
-		let activated = activated.and_then(|h| match h.status {
-			LeafStatus::Stale => None,
-			LeafStatus::Fresh => Some(h),
-		});
 		// Order important! We need to handle activated, prior to deactivated, otherwise we might
 		// cancel still needed jobs.
-		self.start_requesting_chunks(ctx, runtime, activated.into_iter()).await?;
+		if let Some(activated) = activated {
+			// Stale leaves happen after a reversion - we don't want to re-run availability there.
+			if let LeafStatus::Fresh = activated.status {
+				self.start_requesting_chunks(ctx, runtime, activated).await?;
+			}
+		}
 		self.stop_requesting_chunks(deactivated.into_iter());
 		Ok(())
 	}
 
-	/// Start requesting chunks for newly imported heads.
+	/// Start requesting chunks for newly imported head.
+	///
+	/// This will also request [`SESSION_ANCESTRY_LEN`] leaf ancestors from the same session
+	/// and start requesting chunks for them too.
 	async fn start_requesting_chunks<Context>(
 		&mut self,
 		ctx: &mut Context,
 		runtime: &mut RuntimeInfo,
-		new_heads: impl Iterator<Item = ActivatedLeaf>,
-	) -> super::Result<()>
+		new_head: ActivatedLeaf,
+	) -> Result<()>
 	where
 		Context: SubsystemContext,
 	{
-		for ActivatedLeaf { hash: leaf, .. } in new_heads {
-			let cores = get_occupied_cores(ctx, leaf).await?;
+		let ActivatedLeaf { hash: leaf, .. } = new_head;
+		let ancestors_in_session = get_block_ancestors_in_same_session(
+			ctx,
+			runtime,
+			leaf,
+			Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION,
+		)
+		.await
+		.unwrap_or_else(|err| {
+			tracing::debug!(
+				target: LOG_TARGET,
+				leaf = ?leaf,
+				"Failed to fetch leaf ancestors in the same session due to an error: {}",
+				err
+			);
+			Vec::new()
+		});
+		// Also spawn or bump tasks for candidates in ancestry in the same session.
+		for hash in std::iter::once(leaf).chain(ancestors_in_session) {
+			let cores = get_occupied_cores(ctx, hash).await?;
 			tracing::trace!(
 				target: LOG_TARGET,
 				occupied_cores = ?cores,
 				"Query occupied core"
 			);
+			// Important:
+			// We mark the whole ancestry as live in the **leaf** hash, so we don't need to track
+			// any tasks separately.
+			//
+			// The next time the subsystem receives leaf update, some of spawned task will be bumped
+			// to be live in fresh relay parent, while some might get dropped due to the current leaf
+			// being deactivated.
 			self.add_cores(ctx, runtime, leaf, cores).await?;
 		}
+
 		Ok(())
 	}
 
@@ -154,7 +192,7 @@ impl Requester {
 		runtime: &mut RuntimeInfo,
 		leaf: Hash,
 		cores: impl IntoIterator<Item = OccupiedCore>,
-	) -> super::Result<()>
+	) -> Result<()>
 	where
 		Context: SubsystemContext,
 	{
@@ -215,3 +253,69 @@ impl Stream for Requester {
 		}
 	}
 }
+
+/// Requests up to `limit` ancestor hashes of relay parent in the same session.
+async fn get_block_ancestors_in_same_session<Context>(
+	ctx: &mut Context,
+	runtime: &mut RuntimeInfo,
+	head: Hash,
+	limit: usize,
+) -> Result<Vec<Hash>>
+where
+	Context: SubsystemContext,
+{
+	// The order is parent, grandparent, ...
+	//
+	// `limit + 1` since a session index for the last element in ancestry
+	// is obtained through its parent. It always gets truncated because
+	// `session_ancestry_len` can only be incremented `ancestors.len() - 1` times.
+	let mut ancestors = get_block_ancestors(ctx, head, limit + 1).await?;
+	let mut ancestors_iter = ancestors.iter();
+
+	// `head` is the child of the first block in `ancestors`, request its session index.
+	let head_session_index = match ancestors_iter.next() {
+		Some(parent) => runtime.get_session_index(ctx.sender(), *parent).await?,
+		None => {
+			// No first element, i.e. empty.
+			return Ok(ancestors)
+		},
+	};
+
+	let mut session_ancestry_len = 0;
+	// The first parent is skipped.
+	for parent in ancestors_iter {
+		// Parent is the i-th ancestor, request session index for its child -- (i-1)th element.
+		let session_index = runtime.get_session_index(ctx.sender(), *parent).await?;
+		if session_index == head_session_index {
+			session_ancestry_len += 1;
+		} else {
+			break
+		}
+	}
+
+	// Drop the rest.
+	ancestors.truncate(session_ancestry_len);
+
+	Ok(ancestors)
+}
+
+/// Request up to `limit` ancestor hashes of relay parent from the Chain API.
+async fn get_block_ancestors<Context>(
+	ctx: &mut Context,
+	relay_parent: Hash,
+	limit: usize,
+) -> Result<Vec<Hash>>
+where
+	Context: SubsystemContext,
+{
+	let (tx, rx) = oneshot::channel();
+	ctx.send_message(ChainApiMessage::Ancestors {
+		hash: relay_parent,
+		k: limit,
+		response_channel: tx,
+	})
+	.await;
+
+	let ancestors = rx.await.map_err(Fatal::ChainApiSenderDropped)?.map_err(Fatal::ChainApi)?;
+	Ok(ancestors)
+}
diff --git a/polkadot/node/network/availability-distribution/src/requester/tests.rs b/polkadot/node/network/availability-distribution/src/requester/tests.rs
new file mode 100644
index 00000000000..f44589ff9b8
--- /dev/null
+++ b/polkadot/node/network/availability-distribution/src/requester/tests.rs
@@ -0,0 +1,336 @@
+// Copyright 2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use std::{future::Future, sync::Arc};
+
+use futures::FutureExt;
+
+use polkadot_node_network_protocol::jaeger;
+use polkadot_node_primitives::{BlockData, ErasureChunk, PoV, SpawnNamed};
+use polkadot_node_subsystem_util::runtime::RuntimeInfo;
+use polkadot_primitives::v1::{
+	BlockNumber, CoreState, GroupIndex, Hash, Id, ScheduledCore, SessionIndex, SessionInfo,
+};
+use polkadot_subsystem::{
+	messages::{
+		AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
+		NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
+	},
+	ActivatedLeaf, ActiveLeavesUpdate, LeafStatus,
+};
+use polkadot_subsystem_testhelpers::{
+	make_subsystem_context, mock::make_ferdie_keystore, TestSubsystemContext,
+	TestSubsystemContextHandle,
+};
+
+use sp_core::testing::TaskExecutor;
+
+use crate::tests::mock::{get_valid_chunk_data, make_session_info, OccupiedCoreBuilder};
+
+use super::Requester;
+
+fn get_erasure_chunk() -> ErasureChunk {
+	let pov = PoV { block_data: BlockData(vec![45, 46, 47]) };
+	get_valid_chunk_data(pov).1
+}
+
+#[derive(Clone)]
+struct TestState {
+	/// Simulated relay chain heads. For each block except genesis
+	/// there exists a single corresponding candidate, handled in [`spawn_virtual_overseer`].
+	pub relay_chain: Vec<Hash>,
+	pub session_info: SessionInfo,
+	// Defines a way to compute a session index for the block with
+	// a given number. Returns 1 for all blocks by default.
+	pub session_index_for_block: fn(BlockNumber) -> SessionIndex,
+}
+
+impl TestState {
+	fn new() -> Self {
+		let relay_chain: Vec<_> = (0u8..10).map(Hash::repeat_byte).collect();
+		let session_info = make_session_info();
+		let session_index_for_block = |_| 1;
+		Self { relay_chain, session_info, session_index_for_block }
+	}
+}
+
+fn spawn_virtual_overseer(
+	pool: TaskExecutor,
+	test_state: TestState,
+	mut ctx_handle: TestSubsystemContextHandle<AvailabilityDistributionMessage>,
+) {
+	pool.spawn(
+		"virtual-overseer",
+		None,
+		async move {
+			loop {
+				let msg = ctx_handle.try_recv().await;
+				if msg.is_none() {
+					break
+				}
+				match msg.unwrap() {
+					AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(..)) => {},
+					AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk(
+						..,
+						tx,
+					)) => {
+						let chunk = get_erasure_chunk();
+						tx.send(Some(chunk)).expect("Receiver is expected to be alive");
+					},
+					AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreChunk {
+						tx,
+						..
+					}) => {
+						// Silently accept it.
+						tx.send(Ok(())).expect("Receiver is expected to be alive");
+					},
+					AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, req)) => {
+						match req {
+							RuntimeApiRequest::SessionIndexForChild(tx) => {
+								let chain = &test_state.relay_chain;
+								let block_number = chain
+									.iter()
+									.position(|h| *h == hash)
+									.expect("Invalid session index request");
+								// Compute session index.
+								let session_index_for_block = test_state.session_index_for_block;
+
+								tx.send(Ok(session_index_for_block(block_number as u32 + 1)))
+									.expect("Receiver should still be alive");
+							},
+							RuntimeApiRequest::SessionInfo(_, tx) => {
+								tx.send(Ok(Some(test_state.session_info.clone())))
+									.expect("Receiver should be alive.");
+							},
+							RuntimeApiRequest::AvailabilityCores(tx) => {
+								let para_id = Id::from(1);
+								let maybe_block_position =
+									test_state.relay_chain.iter().position(|h| *h == hash);
+								let cores = match maybe_block_position {
+									Some(block_num) => {
+										let core = if block_num == 0 {
+											CoreState::Scheduled(ScheduledCore {
+												para_id,
+												collator: None,
+											})
+										} else {
+											CoreState::Occupied(
+												OccupiedCoreBuilder {
+													group_responsible: GroupIndex(1),
+													para_id,
+													relay_parent: hash,
+												}
+												.build()
+												.0,
+											)
+										};
+										vec![core]
+									},
+									None => Vec::new(),
+								};
+								tx.send(Ok(cores)).expect("Receiver should be alive.")
+							},
+							_ => {
+								panic!("Unexpected runtime request: {:?}", req);
+							},
+						}
+					},
+					AllMessages::ChainApi(ChainApiMessage::Ancestors {
+						hash,
+						k,
+						response_channel,
+					}) => {
+						let chain = &test_state.relay_chain;
+						let maybe_block_position = chain.iter().position(|h| *h == hash);
+						let ancestors = maybe_block_position
+							.map(|idx| chain[..idx].iter().rev().take(k).copied().collect())
+							.unwrap_or_default();
+						response_channel
+							.send(Ok(ancestors))
+							.expect("Receiver is expected to be alive");
+					},
+					msg => panic!("Unexpected overseer message: {:?}", msg),
+				}
+			}
+		}
+		.boxed(),
+	);
+}
+
+fn test_harness<T: Future<Output = ()>>(
+	test_state: TestState,
+	test_fx: impl FnOnce(TestSubsystemContext<AvailabilityDistributionMessage, TaskExecutor>) -> T,
+) {
+	let pool = TaskExecutor::new();
+	let (ctx, ctx_handle) = make_subsystem_context(pool.clone());
+
+	spawn_virtual_overseer(pool, test_state, ctx_handle);
+
+	futures::executor::block_on(test_fx(ctx));
+}
+
+#[test]
+fn check_ancestry_lookup_in_same_session() {
+	let test_state = TestState::new();
+	let mut requester = Requester::new(Default::default());
+	let keystore = make_ferdie_keystore();
+	let mut runtime = RuntimeInfo::new(Some(keystore));
+
+	test_harness(test_state.clone(), |mut ctx| async move {
+		let chain = &test_state.relay_chain;
+
+		let block_number = 1;
+		let update = ActiveLeavesUpdate {
+			activated: Some(ActivatedLeaf {
+				hash: chain[block_number],
+				number: block_number as u32,
+				status: LeafStatus::Fresh,
+				span: Arc::new(jaeger::Span::Disabled),
+			}),
+			deactivated: Vec::new().into(),
+		};
+
+		requester
+			.update_fetching_heads(&mut ctx, &mut runtime, update)
+			.await
+			.expect("Leaf processing failed");
+		let fetch_tasks = &requester.fetches;
+		assert_eq!(fetch_tasks.len(), 1);
+		let block_1_candidate =
+			*fetch_tasks.keys().next().expect("A task is checked to be present; qed");
+
+		let block_number = 2;
+		let update = ActiveLeavesUpdate {
+			activated: Some(ActivatedLeaf {
+				hash: chain[block_number],
+				number: block_number as u32,
+				status: LeafStatus::Fresh,
+				span: Arc::new(jaeger::Span::Disabled),
+			}),
+			deactivated: Vec::new().into(),
+		};
+
+		requester
+			.update_fetching_heads(&mut ctx, &mut runtime, update)
+			.await
+			.expect("Leaf processing failed");
+		let fetch_tasks = &requester.fetches;
+		assert_eq!(fetch_tasks.len(), 2);
+		let task = fetch_tasks.get(&block_1_candidate).expect("Leaf hasn't been deactivated yet");
+		// The task should be live in both blocks 1 and 2.
+		assert_eq!(task.live_in.len(), 2);
+		let block_2_candidate = *fetch_tasks
+			.keys()
+			.find(|hash| **hash != block_1_candidate)
+			.expect("Two tasks are present, the first one corresponds to block 1 candidate; qed");
+
+		// Deactivate both blocks but keep the second task as a
+		// part of ancestry.
+		let block_number = 2 + Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION;
+		let update = ActiveLeavesUpdate {
+			activated: Some(ActivatedLeaf {
+				hash: test_state.relay_chain[block_number],
+				number: block_number as u32,
+				status: LeafStatus::Fresh,
+				span: Arc::new(jaeger::Span::Disabled),
+			}),
+			deactivated: vec![chain[1], chain[2]].into(),
+		};
+		requester
+			.update_fetching_heads(&mut ctx, &mut runtime, update)
+			.await
+			.expect("Leaf processing failed");
+		let fetch_tasks = &requester.fetches;
+		// The leaf + K its ancestors.
+		assert_eq!(fetch_tasks.len(), Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION + 1);
+
+		let block_2_task = fetch_tasks
+			.get(&block_2_candidate)
+			.expect("Expected to be live as a part of ancestry");
+		assert_eq!(block_2_task.live_in.len(), 1);
+	});
+}
+
+#[test]
+fn check_ancestry_lookup_in_different_sessions() {
+	let mut test_state = TestState::new();
+	let mut requester = Requester::new(Default::default());
+	let keystore = make_ferdie_keystore();
+	let mut runtime = RuntimeInfo::new(Some(keystore));
+
+	test_state.session_index_for_block = |block_number| match block_number {
+		0..=3 => 1,
+		_ => 2,
+	};
+
+	test_harness(test_state.clone(), |mut ctx| async move {
+		let chain = &test_state.relay_chain;
+
+		let block_number = 3;
+		let update = ActiveLeavesUpdate {
+			activated: Some(ActivatedLeaf {
+				hash: chain[block_number],
+				number: block_number as u32,
+				status: LeafStatus::Fresh,
+				span: Arc::new(jaeger::Span::Disabled),
+			}),
+			deactivated: Vec::new().into(),
+		};
+
+		requester
+			.update_fetching_heads(&mut ctx, &mut runtime, update)
+			.await
+			.expect("Leaf processing failed");
+		let fetch_tasks = &requester.fetches;
+		assert_eq!(fetch_tasks.len(), 3.min(Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION + 1));
+
+		let block_number = 4;
+		let update = ActiveLeavesUpdate {
+			activated: Some(ActivatedLeaf {
+				hash: chain[block_number],
+				number: block_number as u32,
+				status: LeafStatus::Fresh,
+				span: Arc::new(jaeger::Span::Disabled),
+			}),
+			deactivated: vec![chain[1], chain[2], chain[3]].into(),
+		};
+
+		requester
+			.update_fetching_heads(&mut ctx, &mut runtime, update)
+			.await
+			.expect("Leaf processing failed");
+		let fetch_tasks = &requester.fetches;
+		assert_eq!(fetch_tasks.len(), 1);
+
+		let block_number = 5;
+		let update = ActiveLeavesUpdate {
+			activated: Some(ActivatedLeaf {
+				hash: chain[block_number],
+				number: block_number as u32,
+				status: LeafStatus::Fresh,
+				span: Arc::new(jaeger::Span::Disabled),
+			}),
+			deactivated: vec![chain[4]].into(),
+		};
+
+		requester
+			.update_fetching_heads(&mut ctx, &mut runtime, update)
+			.await
+			.expect("Leaf processing failed");
+		let fetch_tasks = &requester.fetches;
+		assert_eq!(fetch_tasks.len(), 2.min(Requester::LEAF_ANCESTRY_LEN_WITHIN_SESSION + 1));
+	});
+}
diff --git a/polkadot/node/subsystem-types/src/lib.rs b/polkadot/node/subsystem-types/src/lib.rs
index dca7d56a98b..797e195a585 100644
--- a/polkadot/node/subsystem-types/src/lib.rs
+++ b/polkadot/node/subsystem-types/src/lib.rs
@@ -40,7 +40,7 @@ pub use polkadot_node_jaeger as jaeger;
 const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8;
 
 /// The status of an activated leaf.
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq)]
 pub enum LeafStatus {
 	/// A leaf is fresh when it's the first time the leaf has been encountered.
 	/// Most leaves should be fresh.
-- 
GitLab