diff --git a/prdoc/pr_5527.prdoc b/prdoc/pr_5527.prdoc
new file mode 100644
index 0000000000000000000000000000000000000000..38eb75affe44730319ed2d5b4f68292f59790639
--- /dev/null
+++ b/prdoc/pr_5527.prdoc
@@ -0,0 +1,17 @@
+# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
+# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
+
+title: Report BestBlock events only for newBlock reports
+
+doc:
+  - audience: Node Dev
+    description: |
+      This PR ensures that the chainHead_v1_follow method of the RPC-v2 API is always
+      reporting a `BestBlock` event after a `NewBlock`.
+      There was a race condition in the chainHead follow logic which led to the `BestBlock`
+      event to be emitted without an associated `NewBlock` event.
+
+crates:
+  - name: sc-rpc-spec-v2
+    bump: minor
+
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
index 1d28d207124872e0a5c0ff9801775f5c1a5e3181..ebb72ed3d156b1ee39ab818865f5d9f4e34f8234 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
@@ -428,9 +428,12 @@ where
 	/// Generates new block events from the given finalized hashes.
 	///
 	/// It may be possible that the `Finalized` event fired before the `NewBlock`
-	/// event. In that case, for each finalized hash that was not reported yet
-	/// generate the `NewBlock` event. For the final finalized hash we must also
-	/// generate one `BestBlock` event.
+	/// event. Only in that case we generate:
+	/// - `NewBlock` event for all finalized hashes.
+	/// - `BestBlock` event for the last finalized hash.
+	///
+	/// This function returns an empty list if all finalized hashes were already reported
+	/// and are pinned.
 	fn generate_finalized_events(
 		&mut self,
 		finalized_block_hashes: &[Block::Hash],
@@ -454,34 +457,33 @@ where
 			}
 
 			// Generate `NewBlock` events for all blocks beside the last block in the list
-			if i + 1 != finalized_block_hashes.len() {
+			let is_last = i + 1 == finalized_block_hashes.len();
+			if !is_last {
 				// Generate only the `NewBlock` event for this block.
 				events.extend(self.generate_import_events(*hash, *parent, false));
-			} else {
+				continue;
+			}
+
+			if let Some(best_block_hash) = self.current_best_block {
+				let ancestor =
+					sp_blockchain::lowest_common_ancestor(&*self.client, *hash, best_block_hash)?;
+
 				// If we end up here and the `best_block` is a descendent of the finalized block
 				// (last block in the list), it means that there were skipped notifications.
-				// Otherwise `pin_block` would had returned `true`.
+				// Otherwise `pin_block` would had returned `false`.
 				//
 				// When the node falls out of sync and then syncs up to the tip of the chain, it can
 				// happen that we skip notifications. Then it is better to terminate the connection
 				// instead of trying to send notifications for all missed blocks.
-				if let Some(best_block_hash) = self.current_best_block {
-					let ancestor = sp_blockchain::lowest_common_ancestor(
-						&*self.client,
-						*hash,
-						best_block_hash,
-					)?;
-
-					if ancestor.hash == *hash {
-						return Err(SubscriptionManagementError::Custom(
-							"A descendent of the finalized block was already reported".into(),
-						))
-					}
+				if ancestor.hash == *hash {
+					return Err(SubscriptionManagementError::Custom(
+						"A descendent of the finalized block was already reported".into(),
+					))
 				}
-
-				// Let's generate the `NewBlock` and `NewBestBlock` events for the block.
-				events.extend(self.generate_import_events(*hash, *parent, true))
 			}
+
+			// Let's generate the `NewBlock` and `NewBestBlock` events for the block.
+			events.extend(self.generate_import_events(*hash, *parent, true))
 		}
 
 		Ok(events)
@@ -549,39 +551,32 @@ where
 		});
 
 		if let Some(current_best_block) = self.current_best_block {
-			// The best reported block is in the pruned list. Report a new best block.
+			// We need to generate a new best block if the best block is in the pruned list.
 			let is_in_pruned_list =
 				pruned_block_hashes.iter().any(|hash| *hash == current_best_block);
-			// The block is not the last finalized block.
-			//
-			// It can be either:
-			//  - a descendant of the last finalized block
-			//  - a block on a fork that will be pruned in the future.
-			//
-			// In those cases, we emit a new best block.
-			let is_not_last_finalized = current_best_block != last_finalized;
-
-			if is_in_pruned_list || is_not_last_finalized {
-				// We need to generate a best block event.
-				let best_block_hash = self.client.info().best_hash;
-
-				// Defensive check against state missmatch.
-				if best_block_hash == current_best_block {
-					// The client doest not have any new information about the best block.
-					// The information from `.info()` is updated from the DB as the last
-					// step of the finalization and it should be up to date.
-					// If the info is outdated, there is nothing the RPC can do for now.
-					debug!(
-						target: LOG_TARGET,
-						"[follow][id={:?}] Client does not contain different best block",
-						self.sub_id,
-					);
-				} else {
-					// The RPC needs to also submit a new best block changed before the
-					// finalized event.
-					self.current_best_block = Some(best_block_hash);
-					events
-						.push(FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }));
+			if is_in_pruned_list {
+				self.current_best_block = Some(last_finalized);
+				events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
+					best_block_hash: last_finalized,
+				}));
+			} else {
+				// The pruning logic ensures that when the finalized block is announced,
+				// all blocks on forks that have the common ancestor lower or equal
+				// to the finalized block are reported.
+				//
+				// However, we double check if the best block is a descendant of the last finalized
+				// block to ensure we don't miss any events.
+				let ancestor = sp_blockchain::lowest_common_ancestor(
+					&*self.client,
+					last_finalized,
+					current_best_block,
+				)?;
+				let is_descendant = ancestor.hash == last_finalized;
+				if !is_descendant {
+					self.current_best_block = Some(last_finalized);
+					events.push(FollowEvent::BestBlockChanged(BestBlockChanged {
+						best_block_hash: last_finalized,
+					}));
 				}
 			}
 		}
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs b/substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs
index ab5be1f24e5d85b1be51a0ad1b3d42c3dbcbf730..073ee34a79f3f9ef260c49f95d77ad1b2cd953e5 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/test_utils.rs
@@ -69,7 +69,7 @@ impl<Client> ChainHeadMockClient<Client> {
 		}
 	}
 
-	pub async fn trigger_finality_stream(&self, header: Header) {
+	pub async fn trigger_finality_stream(&self, header: Header, stale_heads: Vec<Hash>) {
 		// Ensure the client called the `finality_notification_stream`.
 		while self.finality_sinks.lock().is_empty() {
 			tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
@@ -77,11 +77,8 @@ impl<Client> ChainHeadMockClient<Client> {
 
 		// Build the notification.
 		let (sink, _stream) = tracing_unbounded("test_sink", 100_000);
-		let summary = FinalizeSummary {
-			header: header.clone(),
-			finalized: vec![header.hash()],
-			stale_heads: vec![],
-		};
+		let summary =
+			FinalizeSummary { header: header.clone(), finalized: vec![header.hash()], stale_heads };
 		let notification = FinalityNotification::from_summary(summary, sink);
 
 		for sink in self.finality_sinks.lock().iter_mut() {
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
index 38f091471f8779f7dbf7deb2838dd84406ae79c3..a638a9c7ec54fda2bf95ac799499be77173bacde 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
@@ -2743,7 +2743,7 @@ async fn follow_finalized_before_new_block() {
 	// expect for the `chainHead` to generate `NewBlock`, `BestBlock` and `Finalized` events.
 
 	// Trigger the Finalized notification before the NewBlock one.
-	run_with_timeout(client_mock.trigger_finality_stream(block_1.header.clone())).await;
+	run_with_timeout(client_mock.trigger_finality_stream(block_1.header.clone(), vec![])).await;
 
 	// Initialized must always be reported first.
 	let finalized_hash = client.info().finalized_hash;
@@ -3833,3 +3833,222 @@ async fn follow_unique_pruned_blocks() {
 	});
 	assert_eq!(event, expected);
 }
+
+#[tokio::test]
+async fn follow_report_best_block_of_a_known_block() {
+	let builder = TestClientBuilder::new();
+	let backend = builder.backend();
+	let client = Arc::new(builder.build());
+
+	let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));
+
+	let api = ChainHead::new(
+		client_mock.clone(),
+		backend,
+		Arc::new(TaskExecutor::default()),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
+		},
+	)
+	.into_rpc();
+
+	let finalized_hash = client.info().finalized_hash;
+	let mut sub = api.subscribe_unbounded("chainHead_v1_follow", [false]).await.unwrap();
+	// Initialized must always be reported first.
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::Initialized(Initialized {
+		finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
+		finalized_block_runtime: None,
+		with_runtime: false,
+	});
+	assert_eq!(event, expected);
+
+	// Block tree:
+	//
+	// finalized -> block 1 -> block 2
+	//                         ^^^ best block reported
+	//
+	//           -> block 1 -> block 2_f -> block 3 (best)
+	//                          ^^^ finalized
+
+	let block_1 = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(client.chain_info().genesis_hash)
+		.with_parent_block_number(0)
+		.build()
+		.unwrap()
+		.build()
+		.unwrap()
+		.block;
+	let block_1_hash = block_1.hash();
+	client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();
+	let block_2_f = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(block_1_hash)
+		.with_parent_block_number(1)
+		.build()
+		.unwrap()
+		.build()
+		.unwrap()
+		.block;
+	let block_2_f_hash = block_2_f.hash();
+	client.import(BlockOrigin::Own, block_2_f.clone()).await.unwrap();
+
+	// Import block 2 as best on the fork.
+	let mut block_builder = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(block_1_hash)
+		.with_parent_block_number(1)
+		.build()
+		.unwrap();
+	// This push is required as otherwise block 3 has the same hash as block 2 and won't get
+	// imported
+	block_builder
+		.push_transfer(Transfer {
+			from: AccountKeyring::Alice.into(),
+			to: AccountKeyring::Ferdie.into(),
+			amount: 41,
+			nonce: 0,
+		})
+		.unwrap();
+	let block_2 = block_builder.build().unwrap().block;
+	let block_2_hash = block_2.header.hash();
+	client.import_as_best(BlockOrigin::Own, block_2.clone()).await.unwrap();
+
+	run_with_timeout(client_mock.trigger_import_stream(block_1.header.clone())).await;
+	run_with_timeout(client_mock.trigger_import_stream(block_2_f.header.clone())).await;
+	run_with_timeout(client_mock.trigger_import_stream(block_2.header.clone())).await;
+
+	// Check block 1.
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::NewBlock(NewBlock {
+		block_hash: format!("{:?}", block_1_hash),
+		parent_block_hash: format!("{:?}", finalized_hash),
+		new_runtime: None,
+		with_runtime: false,
+	});
+	assert_eq!(event, expected);
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
+		best_block_hash: format!("{:?}", block_1_hash),
+	});
+	assert_eq!(event, expected);
+
+	// Check block 2.
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::NewBlock(NewBlock {
+		block_hash: format!("{:?}", block_2_f_hash),
+		parent_block_hash: format!("{:?}", block_1_hash),
+		new_runtime: None,
+		with_runtime: false,
+	});
+	assert_eq!(event, expected);
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
+		best_block_hash: format!("{:?}", block_2_f_hash),
+	});
+	assert_eq!(event, expected);
+
+	// Check block 2, that we imported as custom best.
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::NewBlock(NewBlock {
+		block_hash: format!("{:?}", block_2_hash),
+		parent_block_hash: format!("{:?}", block_1_hash),
+		new_runtime: None,
+		with_runtime: false,
+	});
+	assert_eq!(event, expected);
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
+		best_block_hash: format!("{:?}", block_2_hash),
+	});
+	assert_eq!(event, expected);
+
+	// Craft block 3 and import it later to simulate a race condition.
+	let block_3 = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(block_2_f_hash)
+		.with_parent_block_number(2)
+		.build()
+		.unwrap()
+		.build()
+		.unwrap()
+		.block;
+	let block_3_hash = block_3.hash();
+
+	// Set best block info to block 3, that is not announced yet.
+	//
+	// This simulates the following edge-case:
+	// - The client imports a new block as best block.
+	// - The finality stream is triggered before the block is announced.
+	//
+	// This generated in the past a `BestBlock` event for the block that was not announced
+	// by `NewBlock` events.
+	//
+	// This happened because the chainHead was using the `client.info()` without verifying
+	// if the block was announced or not. This was fixed by using the latest finalized
+	// block instead as fallback. For more info see: https://github.com/paritytech/polkadot-sdk/issues/5512.
+	client_mock.set_best_block(block_3_hash, 3);
+
+	// Finalize the block 2 from the fork.
+	client.finalize_block(block_2_f_hash, None).unwrap();
+	run_with_timeout(
+		client_mock.trigger_finality_stream(block_2_f.header.clone(), vec![block_2_hash]),
+	)
+	.await;
+
+	// Block 2f is now the best block, not the block 3 that is not announced yet.
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
+		best_block_hash: format!("{:?}", block_2_f_hash),
+	});
+	assert_eq!(event, expected);
+	// Block 2 must be reported as pruned, even if it was the previous best.
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::Finalized(Finalized {
+		finalized_block_hashes: vec![
+			// Note: the client mock is only reporting one block at a time.
+			// format!("{:?}", block_1_hash),
+			format!("{:?}", block_2_f_hash),
+		],
+		pruned_block_hashes: vec![format!("{:?}", block_2_hash)],
+	});
+	assert_eq!(event, expected);
+
+	// Block 3 is now imported as best.
+	client.import_as_best(BlockOrigin::Own, block_3.clone()).await.unwrap();
+	run_with_timeout(client_mock.trigger_import_stream(block_3.header.clone())).await;
+
+	// Check block 3.
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::NewBlock(NewBlock {
+		block_hash: format!("{:?}", block_3_hash),
+		parent_block_hash: format!("{:?}", block_2_f_hash),
+		new_runtime: None,
+		with_runtime: false,
+	});
+	assert_eq!(event, expected);
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
+		best_block_hash: format!("{:?}", block_3_hash),
+	});
+	assert_eq!(event, expected);
+
+	// Pruned hash can be unpinned.
+	let sub_id = sub.subscription_id();
+	let sub_id = serde_json::to_string(&sub_id).unwrap();
+	let hash = format!("{:?}", block_2_hash);
+	let _res: () = api.call("chainHead_v1_unpin", rpc_params![&sub_id, &hash]).await.unwrap();
+
+	// Finalize the block 3.
+	client.finalize_block(block_3_hash, None).unwrap();
+	run_with_timeout(client_mock.trigger_finality_stream(block_3.header.clone(), vec![])).await;
+
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::Finalized(Finalized {
+		finalized_block_hashes: vec![format!("{:?}", block_3_hash)],
+		pruned_block_hashes: vec![],
+	});
+	assert_eq!(event, expected);
+}