From 2b4b33d01f22b1f4037a404597357f398e21224f Mon Sep 17 00:00:00 2001
From: Rahul Subramaniyam <78006270+rahulksnv@users.noreply.github.com>
Date: Tue, 10 Oct 2023 02:46:23 -0700
Subject: [PATCH] Check for parent of first ready block being on chain (#1812)

When retrieving the ready blocks, verify that the parent of the first
ready block is on chain. If the parent is not on chain, we are
downloading from a fork. In this case, keep downloading until we have a
parent on chain (common ancestor).

Resolves https://github.com/paritytech/polkadot-sdk/issues/493.

---------

Co-authored-by: Aaro Altonen <48052676+altonen@users.noreply.github.com>
---
 substrate/client/network/sync/src/blocks.rs |  25 ++
 substrate/client/network/sync/src/lib.rs    | 397 +++++++++++++++++++-
 2 files changed, 421 insertions(+), 1 deletion(-)

diff --git a/substrate/client/network/sync/src/blocks.rs b/substrate/client/network/sync/src/blocks.rs
index 240c1ca1f8b..cad50fef3e3 100644
--- a/substrate/client/network/sync/src/blocks.rs
+++ b/substrate/client/network/sync/src/blocks.rs
@@ -212,6 +212,31 @@ impl<B: BlockT> BlockCollection<B> {
 		ready
 	}
 
+	/// Returns the block header of the first block that is ready for importing.
+	/// `from` is the maximum block number for the start of the range that we are interested in.
+	/// The function will return None if the first block ready is higher than `from`.
+	/// The logic is structured to be consistent with ready_blocks().
+	pub fn first_ready_block_header(&self, from: NumberFor<B>) -> Option<B::Header> {
+		let mut prev = from;
+		for (&start, range_data) in &self.blocks {
+			if start > prev {
+				break
+			}
+
+			match range_data {
+				BlockRangeState::Complete(blocks) => {
+					let len = (blocks.len() as u32).into();
+					prev = start + len;
+					if let Some(BlockData { block, .. }) = blocks.first() {
+						return block.header.clone()
+					}
+				},
+				_ => continue,
+			}
+		}
+		None
+	}
+
 	pub fn clear_queued(&mut self, hash: &B::Hash) {
 		if let Some((from, to)) = self.queued_blocks.remove(hash) {
 			let mut block_num = from;
diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs
index 10eaa245051..a291da4a90d 100644
--- a/substrate/client/network/sync/src/lib.rs
+++ b/substrate/client/network/sync/src/lib.rs
@@ -1405,8 +1405,27 @@ where
 
 	/// Get the set of downloaded blocks that are ready to be queued for import.
 	fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
+		let start_block = self.best_queued_number + One::one();
+
+		// Verify that the parent of the first available block is in the chain.
+		// If not, we are downloading from a fork. In this case, wait until
+		// the start block has a parent on chain.
+		let parent_on_chain =
+			self.blocks.first_ready_block_header(start_block).map_or(false, |hdr| {
+				std::matches!(
+					self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown),
+					BlockStatus::InChainWithState |
+						BlockStatus::InChainPruned |
+						BlockStatus::Queued
+				)
+			});
+
+		if !parent_on_chain {
+			return vec![]
+		}
+
 		self.blocks
-			.ready_blocks(self.best_queued_number + One::one())
+			.ready_blocks(start_block)
 			.into_iter()
 			.map(|block_data| {
 				let justifications = block_data
@@ -3364,4 +3383,380 @@ mod test {
 		pending_responses.remove(&peers[1]);
 		assert_eq!(pending_responses.len(), 0);
 	}
+
+	#[test]
+	fn syncs_fork_with_partial_response_extends_tip() {
+		sp_tracing::try_init_simple();
+
+		// Set up: the two chains share the first 15 blocks before
+		// diverging. The other(canonical) chain fork is longer.
+		let max_blocks_per_request = 64;
+		let common_ancestor = 15;
+		let non_canonical_chain_length = common_ancestor + 3;
+		let canonical_chain_length = common_ancestor + max_blocks_per_request + 10;
+
+		let (_chain_sync_network_provider, chain_sync_network_handle) =
+			NetworkServiceProvider::new();
+		let mut client = Arc::new(TestClientBuilder::new().build());
+
+		// Blocks on the non-canonical chain.
+		let non_canonical_blocks = (0..non_canonical_chain_length)
+			.map(|_| build_block(&mut client, None, false))
+			.collect::<Vec<_>>();
+
+		// Blocks on the canonical chain.
+		let canonical_blocks = {
+			let mut client = Arc::new(TestClientBuilder::new().build());
+			let common_blocks = non_canonical_blocks[..common_ancestor as usize]
+				.into_iter()
+				.inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap())
+				.cloned()
+				.collect::<Vec<_>>();
+
+			common_blocks
+				.into_iter()
+				.chain(
+					(0..(canonical_chain_length - common_ancestor as u32))
+						.map(|_| build_block(&mut client, None, true)),
+				)
+				.collect::<Vec<_>>()
+		};
+
+		let mut sync = ChainSync::new(
+			SyncMode::Full,
+			client.clone(),
+			ProtocolName::from("test-block-announce-protocol"),
+			1,
+			max_blocks_per_request,
+			None,
+			chain_sync_network_handle,
+		)
+		.unwrap();
+
+		// Connect the node we will sync from
+		let peer_id = PeerId::random();
+		let canonical_tip = canonical_blocks.last().unwrap().clone();
+		let mut request = sync
+			.new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number())
+			.unwrap()
+			.unwrap();
+		assert_eq!(FromBlock::Number(client.info().best_number), request.from);
+		assert_eq!(Some(1), request.max);
+
+		// Do the ancestor search
+		loop {
+			let block =
+				&canonical_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1];
+			let response = create_block_response(vec![block.clone()]);
+
+			let on_block_data = sync.on_block_data(&peer_id, Some(request), response).unwrap();
+			request = if let OnBlockData::Request(_peer, request) = on_block_data {
+				request
+			} else {
+				// We found the ancestor
+				break
+			};
+
+			log::trace!(target: LOG_TARGET, "Request: {request:?}");
+		}
+
+		// The response for the 64 blocks is returned in two parts:
+		// part 1: last 61 blocks [19..79], part 2: first 3 blocks [16-18].
+		// Even though the  first part extends the current chain ending at 18,
+		// it should not result in an import yet.
+		let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64;
+		let resp_2_from = common_ancestor as u64 + 3;
+
+		// No import expected.
+		let request = get_block_request(
+			&mut sync,
+			FromBlock::Number(resp_1_from),
+			max_blocks_per_request as u32,
+			&peer_id,
+		);
+
+		let from = unwrap_from_block_number(request.from.clone());
+		let mut resp_blocks = canonical_blocks[18..from as usize].to_vec();
+		resp_blocks.reverse();
+		let response = create_block_response(resp_blocks.clone());
+		let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
+		assert!(matches!(
+			res,
+			OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty()
+		),);
+
+		// Gap filled, expect max_blocks_per_request being imported now.
+		let request = get_block_request(&mut sync, FromBlock::Number(resp_2_from), 3, &peer_id);
+		let mut resp_blocks = canonical_blocks[common_ancestor as usize..18].to_vec();
+		resp_blocks.reverse();
+		let response = create_block_response(resp_blocks.clone());
+		let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
+		let to_import: Vec<_> = match &res {
+			OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => {
+				assert_eq!(blocks.len(), sync.max_blocks_per_request as usize);
+				blocks
+					.iter()
+					.map(|b| {
+						let num = *b.header.as_ref().unwrap().number() as usize;
+						canonical_blocks[num - 1].clone()
+					})
+					.collect()
+			},
+			_ => {
+				panic!("Unexpected response: {res:?}");
+			},
+		};
+
+		let _ = sync.on_blocks_processed(
+			max_blocks_per_request as usize,
+			resp_blocks.len(),
+			resp_blocks
+				.iter()
+				.rev()
+				.map(|b| {
+					(
+						Ok(BlockImportStatus::ImportedUnknown(
+							*b.header().number(),
+							Default::default(),
+							Some(peer_id),
+						)),
+						b.hash(),
+					)
+				})
+				.collect(),
+		);
+		to_import.into_iter().for_each(|b| {
+			assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_))));
+			block_on(client.import(BlockOrigin::Own, b)).unwrap();
+		});
+		let expected_number = common_ancestor as u32 + max_blocks_per_request as u32;
+		assert_eq!(sync.best_queued_number as u32, expected_number);
+		assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash());
+		// Sync rest of the chain.
+		let request =
+			get_block_request(&mut sync, FromBlock::Hash(canonical_tip.hash()), 10_u32, &peer_id);
+		let mut resp_blocks = canonical_blocks
+			[(canonical_chain_length - 10) as usize..canonical_chain_length as usize]
+			.to_vec();
+		resp_blocks.reverse();
+		let response = create_block_response(resp_blocks.clone());
+		let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
+		assert!(matches!(
+			res,
+			OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize
+		),);
+		let _ = sync.on_blocks_processed(
+			max_blocks_per_request as usize,
+			resp_blocks.len(),
+			resp_blocks
+				.iter()
+				.rev()
+				.map(|b| {
+					(
+						Ok(BlockImportStatus::ImportedUnknown(
+							*b.header().number(),
+							Default::default(),
+							Some(peer_id),
+						)),
+						b.hash(),
+					)
+				})
+				.collect(),
+		);
+		resp_blocks.into_iter().rev().for_each(|b| {
+			assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_))));
+			block_on(client.import(BlockOrigin::Own, b)).unwrap();
+		});
+		let expected_number = canonical_chain_length as u32;
+		assert_eq!(sync.best_queued_number as u32, expected_number);
+		assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash());
+	}
+
+	#[test]
+	fn syncs_fork_with_partial_response_does_not_extend_tip() {
+		sp_tracing::try_init_simple();
+
+		// Set up: the two chains share the first 15 blocks before
+		// diverging. The other(canonical) chain fork is longer.
+		let max_blocks_per_request = 64;
+		let common_ancestor = 15;
+		let non_canonical_chain_length = common_ancestor + 3;
+		let canonical_chain_length = common_ancestor + max_blocks_per_request + 10;
+
+		let (_chain_sync_network_provider, chain_sync_network_handle) =
+			NetworkServiceProvider::new();
+		let mut client = Arc::new(TestClientBuilder::new().build());
+
+		// Blocks on the non-canonical chain.
+		let non_canonical_blocks = (0..non_canonical_chain_length)
+			.map(|_| build_block(&mut client, None, false))
+			.collect::<Vec<_>>();
+
+		// Blocks on the canonical chain.
+		let canonical_blocks = {
+			let mut client = Arc::new(TestClientBuilder::new().build());
+			let common_blocks = non_canonical_blocks[..common_ancestor as usize]
+				.into_iter()
+				.inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap())
+				.cloned()
+				.collect::<Vec<_>>();
+
+			common_blocks
+				.into_iter()
+				.chain(
+					(0..(canonical_chain_length - common_ancestor as u32))
+						.map(|_| build_block(&mut client, None, true)),
+				)
+				.collect::<Vec<_>>()
+		};
+
+		let mut sync = ChainSync::new(
+			SyncMode::Full,
+			client.clone(),
+			ProtocolName::from("test-block-announce-protocol"),
+			1,
+			max_blocks_per_request,
+			None,
+			chain_sync_network_handle,
+		)
+		.unwrap();
+
+		// Connect the node we will sync from
+		let peer_id = PeerId::random();
+		let canonical_tip = canonical_blocks.last().unwrap().clone();
+		let mut request = sync
+			.new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number())
+			.unwrap()
+			.unwrap();
+		assert_eq!(FromBlock::Number(client.info().best_number), request.from);
+		assert_eq!(Some(1), request.max);
+
+		// Do the ancestor search
+		loop {
+			let block =
+				&canonical_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1];
+			let response = create_block_response(vec![block.clone()]);
+
+			let on_block_data = sync.on_block_data(&peer_id, Some(request), response).unwrap();
+			request = if let OnBlockData::Request(_peer, request) = on_block_data {
+				request
+			} else {
+				// We found the ancestor
+				break
+			};
+
+			log::trace!(target: LOG_TARGET, "Request: {request:?}");
+		}
+
+		// The response for the 64 blocks is returned in two parts:
+		// part 1: last 62 blocks [18..79], part 2: first 2 blocks [16-17].
+		// Even though the  first part extends the current chain ending at 18,
+		// it should not result in an import yet.
+		let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64;
+		let resp_2_from = common_ancestor as u64 + 2;
+
+		// No import expected.
+		let request = get_block_request(
+			&mut sync,
+			FromBlock::Number(resp_1_from),
+			max_blocks_per_request as u32,
+			&peer_id,
+		);
+
+		let from = unwrap_from_block_number(request.from.clone());
+		let mut resp_blocks = canonical_blocks[17..from as usize].to_vec();
+		resp_blocks.reverse();
+		let response = create_block_response(resp_blocks.clone());
+		let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
+		assert!(matches!(
+			res,
+			OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty()
+		),);
+
+		// Gap filled, expect max_blocks_per_request being imported now.
+		let request = get_block_request(&mut sync, FromBlock::Number(resp_2_from), 2, &peer_id);
+		let mut resp_blocks = canonical_blocks[common_ancestor as usize..17].to_vec();
+		resp_blocks.reverse();
+		let response = create_block_response(resp_blocks.clone());
+		let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
+		let to_import: Vec<_> = match &res {
+			OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => {
+				assert_eq!(blocks.len(), sync.max_blocks_per_request as usize);
+				blocks
+					.iter()
+					.map(|b| {
+						let num = *b.header.as_ref().unwrap().number() as usize;
+						canonical_blocks[num - 1].clone()
+					})
+					.collect()
+			},
+			_ => {
+				panic!("Unexpected response: {res:?}");
+			},
+		};
+
+		let _ = sync.on_blocks_processed(
+			max_blocks_per_request as usize,
+			resp_blocks.len(),
+			resp_blocks
+				.iter()
+				.rev()
+				.map(|b| {
+					(
+						Ok(BlockImportStatus::ImportedUnknown(
+							*b.header().number(),
+							Default::default(),
+							Some(peer_id),
+						)),
+						b.hash(),
+					)
+				})
+				.collect(),
+		);
+		to_import.into_iter().for_each(|b| {
+			assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_))));
+			block_on(client.import(BlockOrigin::Own, b)).unwrap();
+		});
+		let expected_number = common_ancestor as u32 + max_blocks_per_request as u32;
+		assert_eq!(sync.best_queued_number as u32, expected_number);
+		assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash());
+		// Sync rest of the chain.
+		let request =
+			get_block_request(&mut sync, FromBlock::Hash(canonical_tip.hash()), 10_u32, &peer_id);
+		let mut resp_blocks = canonical_blocks
+			[(canonical_chain_length - 10) as usize..canonical_chain_length as usize]
+			.to_vec();
+		resp_blocks.reverse();
+		let response = create_block_response(resp_blocks.clone());
+		let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
+		assert!(matches!(
+			res,
+			OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize
+		),);
+		let _ = sync.on_blocks_processed(
+			max_blocks_per_request as usize,
+			resp_blocks.len(),
+			resp_blocks
+				.iter()
+				.rev()
+				.map(|b| {
+					(
+						Ok(BlockImportStatus::ImportedUnknown(
+							*b.header().number(),
+							Default::default(),
+							Some(peer_id),
+						)),
+						b.hash(),
+					)
+				})
+				.collect(),
+		);
+		resp_blocks.into_iter().rev().for_each(|b| {
+			assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_))));
+			block_on(client.import(BlockOrigin::Own, b)).unwrap();
+		});
+		let expected_number = canonical_chain_length as u32;
+		assert_eq!(sync.best_queued_number as u32, expected_number);
+		assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash());
+	}
 }
-- 
GitLab