From bfbf7f5d6f5c491a820bb0a4fb9508ce52192a06 Mon Sep 17 00:00:00 2001
From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Date: Wed, 17 Apr 2024 18:29:29 +0300
Subject: [PATCH] chainHead: Report unique hashes for pruned blocks (#3667)

This PR ensures that the reported pruned blocks are unique.

While at it, ensure that the best block event is properly generated when
the last best block is a fork that will be pruned in the future.

To achieve this, the chainHead keeps a LRU set of reported pruned blocks
to ensure the following are not reported twice:

```bash
	 finalized -> block 1 -> block 2 -> block 3

	                      -> block 2 -> block 4 -> block 5

	           -> block 1 -> block 2_f -> block 6 -> block 7 -> block 8
```

When block 7 is finalized the branch [block 2; block 3] is reported as
pruned.
When block 8 is finalized the branch [block 2; block 4; block 5] should
be reported as pruned, however block 2 was already reported as pruned at
the previous step.

This is a side-effect of the pruned blocks being reported at level N -
1. For example, if all pruned forks would be reported with the first
encounter (when block 6 is finalized we know that block 3 and block 5
are stale), we would not need the LRU cache.

cc @paritytech/subxt-team

Closes https://github.com/paritytech/polkadot-sdk/issues/3658

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
---
 Cargo.lock                                    |   1 +
 substrate/client/rpc-spec-v2/Cargo.toml       |   1 +
 .../rpc-spec-v2/src/chain_head/chain_head.rs  |   2 +-
 .../src/chain_head/chain_head_follow.rs       | 108 ++++-----
 .../rpc-spec-v2/src/chain_head/tests.rs       | 213 ++++++++++++++++++
 5 files changed, 273 insertions(+), 52 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index aeb8b2cdb19..7bf5215b6de 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -17465,6 +17465,7 @@ dependencies = [
  "sc-transaction-pool",
  "sc-transaction-pool-api",
  "sc-utils",
+ "schnellru",
  "serde",
  "serde_json",
  "sp-api",
diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml
index e2612d91454..f2fc7bee6e2 100644
--- a/substrate/client/rpc-spec-v2/Cargo.toml
+++ b/substrate/client/rpc-spec-v2/Cargo.toml
@@ -42,6 +42,7 @@ array-bytes = "6.1"
 log = { workspace = true, default-features = true }
 futures-util = { version = "0.3.30", default-features = false }
 rand = "0.8.5"
+schnellru = "0.2.1"
 
 [dev-dependencies]
 jsonrpsee = { version = "0.22", features = ["server", "ws-client"] }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
index 86d9a726d7b..6779180a414 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
@@ -75,7 +75,7 @@ pub struct ChainHeadConfig {
 /// Maximum pinned blocks across all connections.
 /// This number is large enough to consider immediate blocks.
 /// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
-const MAX_PINNED_BLOCKS: usize = 512;
+pub(crate) const MAX_PINNED_BLOCKS: usize = 512;
 
 /// Any block of any subscription should not be pinned more than
 /// this constant. When a subscription contains a block older than this,
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
index 0d87a45c07e..a753896b24c 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
@@ -19,7 +19,7 @@
 //! Implementation of the `chainHead_follow` method.
 
 use crate::chain_head::{
-	chain_head::LOG_TARGET,
+	chain_head::{LOG_TARGET, MAX_PINNED_BLOCKS},
 	event::{
 		BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
 		RuntimeVersionEvent,
@@ -37,6 +37,7 @@ use sc_client_api::{
 	Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
 };
 use sc_rpc::utils::to_sub_message;
+use schnellru::{ByLength, LruMap};
 use sp_api::CallApiAt;
 use sp_blockchain::{
 	Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info,
@@ -68,7 +69,9 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
 	/// Subscription ID.
 	sub_id: String,
 	/// The best reported block by this subscription.
-	best_block_cache: Option<Block::Hash>,
+	current_best_block: Option<Block::Hash>,
+	/// LRU cache of pruned blocks.
+	pruned_blocks: LruMap<Block::Hash, ()>,
 	/// Stop all subscriptions if the distance between the leaves and the current finalized
 	/// block is larger than this value.
 	max_lagging_distance: usize,
@@ -90,7 +93,10 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
 			sub_handle,
 			with_runtime,
 			sub_id,
-			best_block_cache: None,
+			current_best_block: None,
+			pruned_blocks: LruMap::new(ByLength::new(
+				MAX_PINNED_BLOCKS.try_into().unwrap_or(u32::MAX),
+			)),
 			max_lagging_distance,
 		}
 	}
@@ -303,18 +309,20 @@ where
 
 	/// Generate the initial events reported by the RPC `follow` method.
 	///
-	/// Returns the initial events that should be reported directly, together with pruned
-	/// block hashes that should be ignored by the `Finalized` event.
+	/// Returns the initial events that should be reported directly.
 	fn generate_init_events(
 		&mut self,
 		startup_point: &StartupPoint<Block>,
-	) -> Result<(Vec<FollowEvent<Block::Hash>>, HashSet<Block::Hash>), SubscriptionManagementError>
-	{
+	) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
 		let init = self.get_init_blocks_with_forks(startup_point.finalized_hash)?;
 
 		// The initialized event is the first one sent.
 		let initial_blocks = init.finalized_block_descendants;
 		let finalized_block_hashes = init.finalized_block_hashes;
+		// These are the pruned blocks that we should not report again.
+		for pruned in init.pruned_forks {
+			self.pruned_blocks.insert(pruned, ());
+		}
 
 		let finalized_block_hash = startup_point.finalized_hash;
 		let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None);
@@ -345,11 +353,11 @@ where
 		let best_block_hash = startup_point.best_hash;
 		if best_block_hash != finalized_block_hash {
 			let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
-			self.best_block_cache = Some(best_block_hash);
+			self.current_best_block = Some(best_block_hash);
 			finalized_block_descendants.push(best_block);
 		};
 
-		Ok((finalized_block_descendants, init.pruned_forks))
+		Ok(finalized_block_descendants)
 	}
 
 	/// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for the
@@ -377,19 +385,19 @@ where
 		let best_block_event =
 			FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash });
 
-		match self.best_block_cache {
+		match self.current_best_block {
 			Some(block_cache) => {
 				// The RPC layer has not reported this block as best before.
 				// Note: This handles the race with the finalized branch.
 				if block_cache != block_hash {
-					self.best_block_cache = Some(block_hash);
+					self.current_best_block = Some(block_hash);
 					vec![new_block, best_block_event]
 				} else {
 					vec![new_block]
 				}
 			},
 			None => {
-				self.best_block_cache = Some(block_hash);
+				self.current_best_block = Some(block_hash);
 				vec![new_block, best_block_event]
 			},
 		}
@@ -458,7 +466,7 @@ where
 				// When the node falls out of sync and then syncs up to the tip of the chain, it can
 				// happen that we skip notifications. Then it is better to terminate the connection
 				// instead of trying to send notifications for all missed blocks.
-				if let Some(best_block_hash) = self.best_block_cache {
+				if let Some(best_block_hash) = self.current_best_block {
 					let ancestor = sp_blockchain::lowest_common_ancestor(
 						&*self.client,
 						*hash,
@@ -481,13 +489,10 @@ where
 	}
 
 	/// Get all pruned block hashes from the provided stale heads.
-	///
-	/// The result does not include hashes from `to_ignore`.
 	fn get_pruned_hashes(
-		&self,
+		&mut self,
 		stale_heads: &[Block::Hash],
 		last_finalized: Block::Hash,
-		to_ignore: &mut HashSet<Block::Hash>,
 	) -> Result<Vec<Block::Hash>, SubscriptionManagementError> {
 		let blockchain = self.backend.blockchain();
 		let mut pruned = Vec::new();
@@ -497,11 +502,13 @@ where
 
 			// Collect only blocks that are not part of the canonical chain.
 			pruned.extend(tree_route.enacted().iter().filter_map(|block| {
-				if !to_ignore.remove(&block.hash) {
-					Some(block.hash)
-				} else {
-					None
+				if self.pruned_blocks.get(&block.hash).is_some() {
+					// The block was already reported as pruned.
+					return None
 				}
+
+				self.pruned_blocks.insert(block.hash, ());
+				Some(block.hash)
 			}))
 		}
 
@@ -515,7 +522,6 @@ where
 	fn handle_finalized_blocks(
 		&mut self,
 		notification: FinalityNotification<Block>,
-		to_ignore: &mut HashSet<Block::Hash>,
 		startup_point: &StartupPoint<Block>,
 	) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError> {
 		let last_finalized = notification.hash;
@@ -536,25 +542,32 @@ where
 		// Report all pruned blocks from the notification that are not
 		// part of the fork we need to ignore.
 		let pruned_block_hashes =
-			self.get_pruned_hashes(&notification.stale_heads, last_finalized, to_ignore)?;
+			self.get_pruned_hashes(&notification.stale_heads, last_finalized)?;
 
 		let finalized_event = FollowEvent::Finalized(Finalized {
 			finalized_block_hashes,
 			pruned_block_hashes: pruned_block_hashes.clone(),
 		});
 
-		match self.best_block_cache {
-			Some(block_cache) => {
-				// If the best block wasn't pruned, we are done here.
-				if !pruned_block_hashes.iter().any(|hash| *hash == block_cache) {
-					events.push(finalized_event);
-					return Ok(events)
-				}
-
-				// The best block is reported as pruned. Therefore, we need to signal a new
-				// best block event before submitting the finalized event.
+		if let Some(current_best_block) = self.current_best_block {
+			// The best reported block is in the pruned list. Report a new best block.
+			let is_in_pruned_list =
+				pruned_block_hashes.iter().any(|hash| *hash == current_best_block);
+			// The block is not the last finalized block.
+			//
+			// It can be either:
+			//  - a descendant of the last finalized block
+			//  - a block on a fork that will be pruned in the future.
+			//
+			// In those cases, we emit a new best block.
+			let is_not_last_finalized = current_best_block != last_finalized;
+
+			if is_in_pruned_list || is_not_last_finalized {
+				// We need to generate a best block event.
 				let best_block_hash = self.client.info().best_hash;
-				if best_block_hash == block_cache {
+
+				// Defensive check against state missmatch.
+				if best_block_hash == current_best_block {
 					// The client doest not have any new information about the best block.
 					// The information from `.info()` is updated from the DB as the last
 					// step of the finalization and it should be up to date.
@@ -564,23 +577,18 @@ where
 						"[follow][id={:?}] Client does not contain different best block",
 						self.sub_id,
 					);
-					events.push(finalized_event);
-					Ok(events)
 				} else {
 					// The RPC needs to also submit a new best block changed before the
 					// finalized event.
-					self.best_block_cache = Some(best_block_hash);
-					let best_block_event =
-						FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash });
-					events.extend([best_block_event, finalized_event]);
-					Ok(events)
+					self.current_best_block = Some(best_block_hash);
+					events
+						.push(FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }));
 				}
-			},
-			None => {
-				events.push(finalized_event);
-				Ok(events)
-			},
+			}
 		}
+
+		events.push(finalized_event);
+		Ok(events)
 	}
 
 	/// Submit the events from the provided stream to the RPC client
@@ -589,7 +597,6 @@ where
 		&mut self,
 		startup_point: &StartupPoint<Block>,
 		mut stream: EventStream,
-		mut to_ignore: HashSet<Block::Hash>,
 		sink: SubscriptionSink,
 		rx_stop: oneshot::Receiver<()>,
 	) -> Result<(), SubscriptionManagementError>
@@ -612,7 +619,7 @@ where
 				NotificationType::NewBlock(notification) =>
 					self.handle_import_blocks(notification, &startup_point),
 				NotificationType::Finalized(notification) =>
-					self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point),
+					self.handle_finalized_blocks(notification, &startup_point),
 				NotificationType::MethodResponse(notification) => Ok(vec![notification]),
 			};
 
@@ -682,7 +689,7 @@ where
 			.map(|response| NotificationType::MethodResponse(response));
 
 		let startup_point = StartupPoint::from(self.client.info());
-		let (initial_events, pruned_forks) = match self.generate_init_events(&startup_point) {
+		let initial_events = match self.generate_init_events(&startup_point) {
 			Ok(blocks) => blocks,
 			Err(err) => {
 				debug!(
@@ -702,7 +709,6 @@ where
 		let merged = tokio_stream::StreamExt::merge(merged, stream_responses);
 		let stream = stream::once(futures::future::ready(initial)).chain(merged);
 
-		self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, sub_data.rx_stop)
-			.await
+		self.submit_events(&startup_point, stream.boxed(), sink, sub_data.rx_stop).await
 	}
 }
diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
index c2bff7c50d5..14f664858a0 100644
--- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
+++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs
@@ -187,6 +187,62 @@ async fn setup_api() -> (
 	(client, api, sub, sub_id, block)
 }
 
+async fn import_block(
+	mut client: Arc<Client<Backend>>,
+	parent_hash: <Block as BlockT>::Hash,
+	parent_number: u64,
+) -> Block {
+	let block = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(parent_hash)
+		.with_parent_block_number(parent_number)
+		.build()
+		.unwrap()
+		.build()
+		.unwrap()
+		.block;
+	client.import(BlockOrigin::Own, block.clone()).await.unwrap();
+	block
+}
+
+async fn import_best_block_with_tx(
+	mut client: Arc<Client<Backend>>,
+	parent_hash: <Block as BlockT>::Hash,
+	parent_number: u64,
+	tx: Transfer,
+) -> Block {
+	let mut block_builder = BlockBuilderBuilder::new(&*client)
+		.on_parent_block(parent_hash)
+		.with_parent_block_number(parent_number)
+		.build()
+		.unwrap();
+	block_builder.push_transfer(tx).unwrap();
+	let block = block_builder.build().unwrap().block;
+	client.import_as_best(BlockOrigin::Own, block.clone()).await.unwrap();
+	block
+}
+
+/// Check the subscription produces a new block and a best block event.
+///
+/// The macro is used instead of a fn to preserve the lines of code in case of panics.
+macro_rules! check_new_and_best_block_events {
+	($sub:expr, $block_hash:expr, $parent_hash:expr) => {
+		let event: FollowEvent<String> = get_next_event($sub).await;
+		let expected = FollowEvent::NewBlock(NewBlock {
+			block_hash: format!("{:?}", $block_hash),
+			parent_block_hash: format!("{:?}", $parent_hash),
+			new_runtime: None,
+			with_runtime: false,
+		});
+		assert_eq!(event, expected);
+
+		let event: FollowEvent<String> = get_next_event($sub).await;
+		let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
+			best_block_hash: format!("{:?}", $block_hash),
+		});
+		assert_eq!(event, expected);
+	};
+}
+
 #[tokio::test]
 async fn follow_subscription_produces_blocks() {
 	let builder = TestClientBuilder::new();
@@ -3644,3 +3700,160 @@ async fn chain_head_limit_reached() {
 	// Initialized must always be reported first.
 	let _event: FollowEvent<String> = get_next_event(&mut sub).await;
 }
+
+#[tokio::test]
+async fn follow_unique_pruned_blocks() {
+	let builder = TestClientBuilder::new();
+	let backend = builder.backend();
+	let client = Arc::new(builder.build());
+
+	let api = ChainHead::new(
+		client.clone(),
+		backend,
+		Arc::new(TaskExecutor::default()),
+		ChainHeadConfig {
+			global_max_pinned_blocks: MAX_PINNED_BLOCKS,
+			subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
+			subscription_max_ongoing_operations: MAX_OPERATIONS,
+			operation_max_storage_items: MAX_PAGINATION_LIMIT,
+			max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
+			max_lagging_distance: MAX_LAGGING_DISTANCE,
+		},
+	)
+	.into_rpc();
+
+	let finalized_hash = client.info().finalized_hash;
+	let mut sub = api.subscribe_unbounded("chainHead_unstable_follow", [false]).await.unwrap();
+
+	// Initialized must always be reported first.
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::Initialized(Initialized {
+		finalized_block_hashes: vec![format!("{:?}", finalized_hash)],
+		finalized_block_runtime: None,
+		with_runtime: false,
+	});
+	assert_eq!(event, expected);
+
+	// Block tree:
+	//
+	// finalized -> block 1 -> block 2 -> block 3
+	//
+	//                      -> block 2 -> block 4 -> block 5
+	//
+	//           -> block 1 -> block 2_f -> block 6
+	//                                    ^^^ finalized
+	//                                                 -> block 7
+	//                                                    ^^^ finalized
+	//                                                            -> block 8
+	//                                                               ^^^ finalized
+	// The chainHead will see block 5 as the best block. However, the
+	// client will finalize the block 6, which is on another fork.
+	//
+	// When the block 6 is finalized, block 2 block 3 block 4 and block 5 are placed on an invalid
+	// fork. However, pruning of blocks happens on level N - 1.
+	// Therefore, no pruned blocks are reported yet.
+	//
+	// When the block 7 is finalized, block 3 is detected as stale. At this step, block 2 and 3
+	// are reported as pruned.
+	//
+	// When the block 8 is finalized, block 5 block 4 and block 2 are detected as stale. However,
+	// only blocks 5 and 4 are reported as pruned. This is because the block 2 was previously
+	// reported.
+
+	// Initial setup steps:
+	let block_1_hash =
+		import_block(client.clone(), client.chain_info().genesis_hash, 0).await.hash();
+	let block_2_f_hash = import_block(client.clone(), block_1_hash, 1).await.hash();
+	let block_6_hash = import_block(client.clone(), block_2_f_hash, 2).await.hash();
+	// Import block 2 as best on the fork.
+	let mut tx_alice_ferdie = Transfer {
+		from: AccountKeyring::Alice.into(),
+		to: AccountKeyring::Ferdie.into(),
+		amount: 41,
+		nonce: 0,
+	};
+	let block_2_hash =
+		import_best_block_with_tx(client.clone(), block_1_hash, 1, tx_alice_ferdie.clone())
+			.await
+			.hash();
+
+	let block_3_hash = import_block(client.clone(), block_2_hash, 2).await.hash();
+	// Fork block 4.
+	tx_alice_ferdie.nonce = 1;
+	let block_4_hash = import_best_block_with_tx(client.clone(), block_2_hash, 2, tx_alice_ferdie)
+		.await
+		.hash();
+	let block_5_hash = import_block(client.clone(), block_4_hash, 3).await.hash();
+
+	// Check expected events generated by the setup.
+	{
+		// Check block 1 -> block 2f -> block 6.
+		check_new_and_best_block_events!(&mut sub, block_1_hash, finalized_hash);
+		check_new_and_best_block_events!(&mut sub, block_2_f_hash, block_1_hash);
+		check_new_and_best_block_events!(&mut sub, block_6_hash, block_2_f_hash);
+
+		// Check (block 1 ->) block 2 -> block 3.
+		check_new_and_best_block_events!(&mut sub, block_2_hash, block_1_hash);
+		check_new_and_best_block_events!(&mut sub, block_3_hash, block_2_hash);
+
+		// Check (block 1 -> block 2 ->) block 4 -> block 5.
+		check_new_and_best_block_events!(&mut sub, block_4_hash, block_2_hash);
+		check_new_and_best_block_events!(&mut sub, block_5_hash, block_4_hash);
+	}
+
+	// Finalize the block 6 from the fork.
+	client.finalize_block(block_6_hash, None).unwrap();
+
+	// Expect to report the best block changed before the finalized event.
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
+		best_block_hash: format!("{:?}", block_6_hash),
+	});
+	assert_eq!(event, expected);
+
+	// Block 2 must be reported as pruned, even if it was the previous best.
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::Finalized(Finalized {
+		finalized_block_hashes: vec![
+			format!("{:?}", block_1_hash),
+			format!("{:?}", block_2_f_hash),
+			format!("{:?}", block_6_hash),
+		],
+		pruned_block_hashes: vec![],
+	});
+	assert_eq!(event, expected);
+
+	// Pruned hash can be unpinned.
+	let sub_id = sub.subscription_id();
+	let sub_id = serde_json::to_string(&sub_id).unwrap();
+	let hash = format!("{:?}", block_2_hash);
+	let _res: () = api.call("chainHead_unstable_unpin", rpc_params![&sub_id, &hash]).await.unwrap();
+
+	// Import block 7 and check it.
+	let block_7_hash = import_block(client.clone(), block_6_hash, 3).await.hash();
+	check_new_and_best_block_events!(&mut sub, block_7_hash, block_6_hash);
+
+	// Finalize the block 7.
+	client.finalize_block(block_7_hash, None).unwrap();
+
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::Finalized(Finalized {
+		finalized_block_hashes: vec![format!("{:?}", block_7_hash)],
+		pruned_block_hashes: vec![format!("{:?}", block_2_hash), format!("{:?}", block_3_hash)],
+	});
+	assert_eq!(event, expected);
+
+	// Check block 8.
+	let block_8_hash = import_block(client.clone(), block_7_hash, 4).await.hash();
+	check_new_and_best_block_events!(&mut sub, block_8_hash, block_7_hash);
+
+	// Finalize the block 8.
+	client.finalize_block(block_8_hash, None).unwrap();
+
+	let event: FollowEvent<String> = get_next_event(&mut sub).await;
+	let expected = FollowEvent::Finalized(Finalized {
+		finalized_block_hashes: vec![format!("{:?}", block_8_hash)],
+		pruned_block_hashes: vec![format!("{:?}", block_4_hash), format!("{:?}", block_5_hash)],
+	});
+	assert_eq!(event, expected);
+}
-- 
GitLab