From 7ec692d11c5b0d749ad89a71df835dae4d3756ed Mon Sep 17 00:00:00 2001
From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Date: Wed, 14 Feb 2024 16:41:07 +0200
Subject: [PATCH] chainHead: Error on duplicate unpin hashes (#3313)

This PR addresses an issue where calling chainHead_unpin with duplicate
hashes could lead to unintended side effects.

This backports:
https://github.com/paritytech/json-rpc-interface-spec/pull/135

While at it, have added a test to check that the global reference count
is decremented only once on unpin.

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Davide Galassi <davxy@datawok.net>
---
 .../rpc-spec-v2/src/chain_head/chain_head.rs  |   2 +
 .../rpc-spec-v2/src/chain_head/error.rs       |   7 ++
 .../src/chain_head/subscription/error.rs      |   6 +-
 .../src/chain_head/subscription/inner.rs      |  88 ++++++++++++++-
 .../rpc-spec-v2/src/chain_head/tests.rs       | 102 ++++++++++++++++++
 5 files changed, 203 insertions(+), 2 deletions(-)

diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
index 0e207addcae..bcca1dc18e2 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
@@ -424,6 +424,8 @@ where
 				// Block is not part of the subscription.
 				Err(ChainHeadRpcError::InvalidBlock)
 			},
+			Err(SubscriptionManagementError::DuplicateHashes) =>
+				Err(ChainHeadRpcError::InvalidDuplicateHashes),
 			Err(_) => Err(ChainHeadRpcError::InvalidBlock),
 		}
 	}
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/error.rs b/substrate/client/rpc-spec-v2/src/chain_head/error.rs
index bf290edb29e..8c50e445aa0 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/error.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/error.rs
@@ -32,6 +32,9 @@ pub enum Error {
 	/// Wait-for-continue event not generated.
 	#[error("Wait for continue event was not generated for the subscription")]
 	InvalidContinue,
+	/// Received duplicate hashes for the `chainHead_unpin` method.
+	#[error("Received duplicate hashes for the `chainHead_unpin` method")]
+	InvalidDuplicateHashes,
 	/// Invalid parameter provided to the RPC method.
 	#[error("Invalid parameter: {0}")]
 	InvalidParam(String),
@@ -49,6 +52,8 @@ pub mod rpc_spec_v2 {
 	pub const INVALID_RUNTIME_CALL: i32 = -32802;
 	/// Wait-for-continue event not generated.
 	pub const INVALID_CONTINUE: i32 = -32803;
+	/// Received duplicate hashes for the `chainHead_unpin` method.
+	pub const INVALID_DUPLICATE_HASHES: i32 = -32804;
 }
 
 /// General purpose errors, as defined in
@@ -71,6 +76,8 @@ impl From<Error> for ErrorObject<'static> {
 				ErrorObject::owned(rpc_spec_v2::INVALID_RUNTIME_CALL, msg, None::<()>),
 			Error::InvalidContinue =>
 				ErrorObject::owned(rpc_spec_v2::INVALID_CONTINUE, msg, None::<()>),
+			Error::InvalidDuplicateHashes =>
+				ErrorObject::owned(rpc_spec_v2::INVALID_DUPLICATE_HASHES, msg, None::<()>),
 			Error::InvalidParam(_) =>
 				ErrorObject::owned(json_rpc_spec::INVALID_PARAM_ERROR, msg, None::<()>),
 			Error::InternalError(_) =>
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs
index 38e8fd7384f..2c22e51ca4d 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/error.rs
@@ -38,6 +38,9 @@ pub enum SubscriptionManagementError {
 	/// The specified subscription ID is not present.
 	#[error("Subscription is absent")]
 	SubscriptionAbsent,
+	/// The unpin method was called with duplicate hashes.
+	#[error("Duplicate hashes")]
+	DuplicateHashes,
 	/// Custom error.
 	#[error("Subscription error {0}")]
 	Custom(String),
@@ -52,7 +55,8 @@ impl PartialEq for SubscriptionManagementError {
 			(Self::Blockchain(_), Self::Blockchain(_)) |
 			(Self::BlockHashAbsent, Self::BlockHashAbsent) |
 			(Self::BlockHeaderAbsent, Self::BlockHeaderAbsent) |
-			(Self::SubscriptionAbsent, Self::SubscriptionAbsent) => true,
+			(Self::SubscriptionAbsent, Self::SubscriptionAbsent) |
+			(Self::DuplicateHashes, Self::DuplicateHashes) => true,
 			(Self::Custom(lhs), Self::Custom(rhs)) => lhs == rhs,
 			_ => false,
 		}
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
index 2b250f3dc2c..d2879679501 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
@@ -22,7 +22,7 @@ use sc_client_api::Backend;
 use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
 use sp_runtime::traits::Block as BlockT;
 use std::{
-	collections::{hash_map::Entry, HashMap},
+	collections::{hash_map::Entry, HashMap, HashSet},
 	sync::{atomic::AtomicBool, Arc},
 	time::{Duration, Instant},
 };
@@ -750,11 +750,27 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
 		}
 	}
 
+	/// Ensure the provided hashes are unique.
+	fn ensure_hash_uniqueness(
+		hashes: impl IntoIterator<Item = Block::Hash> + Clone,
+	) -> Result<(), SubscriptionManagementError> {
+		let mut set = HashSet::new();
+		hashes.into_iter().try_for_each(|hash| {
+			if !set.insert(hash) {
+				Err(SubscriptionManagementError::DuplicateHashes)
+			} else {
+				Ok(())
+			}
+		})
+	}
+
 	pub fn unpin_blocks(
 		&mut self,
 		sub_id: &str,
 		hashes: impl IntoIterator<Item = Block::Hash> + Clone,
 	) -> Result<(), SubscriptionManagementError> {
+		Self::ensure_hash_uniqueness(hashes.clone())?;
+
 		let Some(sub) = self.subs.get_mut(sub_id) else {
 			return Err(SubscriptionManagementError::SubscriptionAbsent)
 		};
@@ -985,6 +1001,76 @@ mod tests {
 		assert!(block_state.is_none());
 	}
 
+	#[test]
+	fn unpin_duplicate_hashes() {
+		let (backend, mut client) = init_backend();
+		let block = BlockBuilderBuilder::new(&*client)
+			.on_parent_block(client.chain_info().genesis_hash)
+			.with_parent_block_number(0)
+			.build()
+			.unwrap()
+			.build()
+			.unwrap()
+			.block;
+		let hash_1 = block.header.hash();
+		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
+		let block = BlockBuilderBuilder::new(&*client)
+			.on_parent_block(hash_1)
+			.with_parent_block_number(1)
+			.build()
+			.unwrap()
+			.build()
+			.unwrap()
+			.block;
+		let hash_2 = block.header.hash();
+		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
+		let block = BlockBuilderBuilder::new(&*client)
+			.on_parent_block(hash_2)
+			.with_parent_block_number(2)
+			.build()
+			.unwrap()
+			.build()
+			.unwrap()
+			.block;
+		let hash_3 = block.header.hash();
+		futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
+
+		let mut subs =
+			SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
+		let id_1 = "abc".to_string();
+		let id_2 = "abcd".to_string();
+
+		// Pin all blocks for the first subscription.
+		let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
+		assert_eq!(subs.pin_block(&id_1, hash_1).unwrap(), true);
+		assert_eq!(subs.pin_block(&id_1, hash_2).unwrap(), true);
+		assert_eq!(subs.pin_block(&id_1, hash_3).unwrap(), true);
+
+		// Pin only block 2 for the second subscription.
+		let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
+		assert_eq!(subs.pin_block(&id_2, hash_2).unwrap(), true);
+
+		// Check reference count.
+		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
+		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
+		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
+
+		// Unpin the same block twice.
+		let err = subs.unpin_blocks(&id_1, vec![hash_1, hash_1, hash_2, hash_2]).unwrap_err();
+		assert_eq!(err, SubscriptionManagementError::DuplicateHashes);
+
+		// Check reference count must be unaltered.
+		assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1);
+		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 2);
+		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
+
+		// Unpin the blocks correctly.
+		subs.unpin_blocks(&id_1, vec![hash_1, hash_2]).unwrap();
+		assert_eq!(subs.global_blocks.get(&hash_1), None);
+		assert_eq!(*subs.global_blocks.get(&hash_2).unwrap(), 1);
+		assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
+	}
+
 	#[test]
 	fn subscription_lock_block() {
 		let builder = TestClientBuilder::new();
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
index 955a361e3ea..ccb928262e4 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
@@ -1617,6 +1617,108 @@ async fn follow_with_unpin() {
 	assert!(sub.next::<FollowEvent<String>>().await.is_none());
 }
 
+#[tokio::test]
+async fn unpin_duplicate_hashes() {
+	let builder = TestClientBuilder::new();
+	let backend = builder.backend();
+	let mut client = Arc::new(builder.build());
+
+	let api = ChainHead::new(
+		client.clone(),
+		backend,
+		Arc::new(TaskExecutor::default()),
+		ChainHeadConfig {
+			global_max_pinned_blocks: 3,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+		},
+	)
+	.into_rpc();
+
+	let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
+	let sub_id = sub.subscription_id();
+	let sub_id = serde_json::to_string(&sub_id).unwrap();
+
+	let block = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(client.chain_info().genesis_hash)
+		.with_parent_block_number(0)
+		.build()
+		.unwrap()
+		.build()
+		.unwrap()
+		.block;
+	let block_hash = format!("{:?}", block.header.hash());
+	client.import(BlockOrigin::Own, block.clone()).await.unwrap();
+
+	// Ensure the imported block is propagated and pinned for this subscription.
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::Initialized(_)
+	);
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::NewBlock(_)
+	);
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::BestBlockChanged(_)
+	);
+
+	// Try to unpin duplicate hashes.
+	let err = api
+		.call::<_, serde_json::Value>(
+			"chainHead_unstable_unpin",
+			rpc_params![&sub_id, vec![&block_hash, &block_hash]],
+		)
+		.await
+		.unwrap_err();
+	assert_matches!(err,
+		Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_DUPLICATE_HASHES && err.message() == "Received duplicate hashes for the `chainHead_unpin` method"
+	);
+
+	// Block tree:
+	//   finalized_block -> block -> block2
+	let block2 = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(block.hash())
+		.with_parent_block_number(1)
+		.build()
+		.unwrap()
+		.build()
+		.unwrap()
+		.block;
+	let block_hash_2 = format!("{:?}", block2.header.hash());
+	client.import(BlockOrigin::Own, block2.clone()).await.unwrap();
+
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::NewBlock(_)
+	);
+
+	assert_matches!(
+		get_next_event::<FollowEvent<String>>(&mut sub).await,
+		FollowEvent::BestBlockChanged(_)
+	);
+
+	// Try to unpin duplicate hashes.
+	let err = api
+		.call::<_, serde_json::Value>(
+			"chainHead_unstable_unpin",
+			rpc_params![&sub_id, vec![&block_hash, &block_hash_2, &block_hash]],
+		)
+		.await
+		.unwrap_err();
+	assert_matches!(err,
+		Error::Call(err) if err.code() == super::error::rpc_spec_v2::INVALID_DUPLICATE_HASHES && err.message() == "Received duplicate hashes for the `chainHead_unpin` method"
+	);
+
+	// Can unpin blocks.
+	let _res: () = api
+		.call("chainHead_unstable_unpin", rpc_params![&sub_id, vec![&block_hash, &block_hash_2]])
+		.await
+		.unwrap();
+}
+
 #[tokio::test]
 async fn follow_with_multiple_unpin_hashes() {
 	let builder = TestClientBuilder::new();
-- 
GitLab