From 00787a10e9d5d6718ad050df7ab01015f6e829ab Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 25 Jul 2023 18:43:04 +0300 Subject: [PATCH] chainHead_storage: Iterate over keys (#14628) * chainHead: Iterate over key,values and key,hashes Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chainHead/tests: Multi query with iteration over keys Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chainHead/events: Fix typo in StorageQuery Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * chainHead: Take 10 from key iterator Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: parity-processbot <> --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 6 +- .../src/chain_head/chain_head_storage.rs | 147 +++++++++++++----- .../rpc-spec-v2/src/chain_head/event.rs | 12 +- .../rpc-spec-v2/src/chain_head/tests.rs | 119 ++++++++++++-- 4 files changed, 217 insertions(+), 67 deletions(-) diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index bb6a6bcbdfe..7f34bde6886 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -300,9 +300,7 @@ where let items = items .into_iter() .map(|query| { - if query.queue_type != StorageQueryType::Value && - query.queue_type != StorageQueryType::Hash - { + if query.query_type == StorageQueryType::ClosestDescendantMerkleValue { // Note: remove this once all types are implemented. let _ = sink.reject(ChainHeadRpcError::InvalidParam( "Storage query type not supported".into(), @@ -312,7 +310,7 @@ where Ok(StorageQuery { key: StorageKey(parse_hex_param(&mut sink, query.key)?), - queue_type: query.queue_type, + query_type: query.query_type, }) }) .collect::<Result<Vec<_>, _>>()?; diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index 9b5bf2a1180..310e4890187 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -33,6 +33,18 @@ use super::{ hex_string, ErrorEvent, }; +/// The maximum number of items the `chainHead_storage` can return +/// before paginations is required. +const MAX_ITER_ITEMS: usize = 10; + +/// The query type of an interation. +enum IterQueryType { + /// Iterating over (key, value) pairs. + Value, + /// Iterating over (key, hash) pairs. + Hash, +} + /// Generates the events of the `chainHead_storage` method. pub struct ChainHeadStorage<Client, Block, BE> { /// Substrate client. @@ -58,7 +70,10 @@ fn is_key_queryable(key: &[u8]) -> bool { } /// The result of making a query call. -type QueryResult = Result<StorageResult<String>, ChainHeadStorageEvent<String>>; +type QueryResult = Result<Option<StorageResult<String>>, ChainHeadStorageEvent<String>>; + +/// The result of iterating over keys. +type QueryIterResult = Result<Vec<StorageResult<String>>, ChainHeadStorageEvent<String>>; impl<Client, Block, BE> ChainHeadStorage<Client, Block, BE> where @@ -72,7 +87,7 @@ where hash: Block::Hash, key: &StorageKey, child_key: Option<&ChildInfo>, - ) -> Option<QueryResult> { + ) -> QueryResult { let result = if let Some(child_key) = child_key { self.client.child_storage(hash, child_key, key) } else { @@ -81,17 +96,15 @@ where result .map(|opt| { - opt.map(|storage_data| { - QueryResult::Ok(StorageResult::<String> { - key: hex_string(&key.0), - result: StorageResultType::Value(hex_string(&storage_data.0)), - }) - }) + QueryResult::Ok(opt.map(|storage_data| StorageResult::<String> { + key: hex_string(&key.0), + result: StorageResultType::Value(hex_string(&storage_data.0)), + })) }) .unwrap_or_else(|err| { - Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent { + QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent { error: err.to_string(), - }))) + })) }) } @@ -101,7 +114,7 @@ where hash: Block::Hash, key: &StorageKey, child_key: Option<&ChildInfo>, - ) -> Option<QueryResult> { + ) -> QueryResult { let result = if let Some(child_key) = child_key { self.client.child_storage_hash(hash, child_key, key) } else { @@ -110,36 +123,49 @@ where result .map(|opt| { - opt.map(|storage_data| { - QueryResult::Ok(StorageResult::<String> { - key: hex_string(&key.0), - result: StorageResultType::Hash(hex_string(&storage_data.as_ref())), - }) - }) + QueryResult::Ok(opt.map(|storage_data| StorageResult::<String> { + key: hex_string(&key.0), + result: StorageResultType::Hash(hex_string(&storage_data.as_ref())), + })) }) .unwrap_or_else(|err| { - Some(QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent { + QueryResult::Err(ChainHeadStorageEvent::<String>::Error(ErrorEvent { error: err.to_string(), - }))) + })) }) } - /// Make the storage query. - fn query_storage( + /// Handle iterating over (key, value) or (key, hash) pairs. + fn query_storage_iter( &self, hash: Block::Hash, - query: &StorageQuery<StorageKey>, + key: &StorageKey, child_key: Option<&ChildInfo>, - ) -> Option<QueryResult> { - if !is_key_queryable(&query.key.0) { - return None + ty: IterQueryType, + ) -> QueryIterResult { + let keys_iter = if let Some(child_key) = child_key { + self.client.child_storage_keys(hash, child_key.to_owned(), Some(key), None) + } else { + self.client.storage_keys(hash, Some(key), None) } - - match query.queue_type { - StorageQueryType::Value => self.query_storage_value(hash, &query.key, child_key), - StorageQueryType::Hash => self.query_storage_hash(hash, &query.key, child_key), - _ => None, + .map_err(|err| { + ChainHeadStorageEvent::<String>::Error(ErrorEvent { error: err.to_string() }) + })?; + + let mut ret = Vec::with_capacity(MAX_ITER_ITEMS); + let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS); + while let Some(key) = keys_iter.next() { + let result = match ty { + IterQueryType::Value => self.query_storage_value(hash, &key, child_key), + IterQueryType::Hash => self.query_storage_hash(hash, &key, child_key), + }?; + + if let Some(result) = result { + ret.push(result); + } } + + QueryIterResult::Ok(ret) } /// Generate the block events for the `chainHead_storage` method. @@ -159,19 +185,56 @@ where let mut storage_results = Vec::with_capacity(items.len()); for item in items { - let Some(result) = self.query_storage(hash, &item, child_key.as_ref()) else { - continue - }; - - match result { - QueryResult::Ok(storage_result) => storage_results.push(storage_result), - QueryResult::Err(event) => { - let _ = sink.send(&event); - // If an error is encountered for any of the query items - // do not produce any other events. - return - }, + if !is_key_queryable(&item.key.0) { + continue } + + match item.query_type { + StorageQueryType::Value => { + match self.query_storage_value(hash, &item.key, child_key.as_ref()) { + Ok(Some(value)) => storage_results.push(value), + Ok(None) => continue, + Err(err) => { + let _ = sink.send(&err); + return + }, + } + }, + StorageQueryType::Hash => + match self.query_storage_hash(hash, &item.key, child_key.as_ref()) { + Ok(Some(value)) => storage_results.push(value), + Ok(None) => continue, + Err(err) => { + let _ = sink.send(&err); + return + }, + }, + StorageQueryType::DescendantsValues => match self.query_storage_iter( + hash, + &item.key, + child_key.as_ref(), + IterQueryType::Value, + ) { + Ok(values) => storage_results.extend(values), + Err(err) => { + let _ = sink.send(&err); + return + }, + }, + StorageQueryType::DescendantsHashes => match self.query_storage_iter( + hash, + &item.key, + child_key.as_ref(), + IterQueryType::Hash, + ) { + Ok(values) => storage_results.extend(values), + Err(err) => { + let _ = sink.send(&err); + return + }, + }, + _ => continue, + }; } if !storage_results.is_empty() { diff --git a/substrate/client/rpc-spec-v2/src/chain_head/event.rs b/substrate/client/rpc-spec-v2/src/chain_head/event.rs index a141eee195e..0199edee423 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/event.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/event.rs @@ -249,7 +249,7 @@ pub struct StorageQuery<Key> { pub key: Key, /// The type of the storage query. #[serde(rename = "type")] - pub queue_type: StorageQueryType, + pub query_type: StorageQueryType, } /// The type of the storage query. @@ -558,7 +558,7 @@ mod tests { #[test] fn chain_head_storage_query() { // Item with Value. - let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Value }; + let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Value }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","type":"value"}"#; @@ -568,7 +568,7 @@ mod tests { assert_eq!(dec, item); // Item with Hash. - let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::Hash }; + let item = StorageQuery { key: "0x1", query_type: StorageQueryType::Hash }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","type":"hash"}"#; @@ -578,7 +578,7 @@ mod tests { assert_eq!(dec, item); // Item with DescendantsValues. - let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsValues }; + let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsValues }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","type":"descendants-values"}"#; @@ -588,7 +588,7 @@ mod tests { assert_eq!(dec, item); // Item with DescendantsHashes. - let item = StorageQuery { key: "0x1", queue_type: StorageQueryType::DescendantsHashes }; + let item = StorageQuery { key: "0x1", query_type: StorageQueryType::DescendantsHashes }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","type":"descendants-hashes"}"#; @@ -599,7 +599,7 @@ mod tests { // Item with Merkle. let item = - StorageQuery { key: "0x1", queue_type: StorageQueryType::ClosestDescendantMerkleValue }; + StorageQuery { key: "0x1", query_type: StorageQueryType::ClosestDescendantMerkleValue }; // Encode let ser = serde_json::to_string(&item).unwrap(); let exp = r#"{"key":"0x1","type":"closest-descendant-merkle-value"}"#; diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index 4a5030891d3..5608a474c71 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -527,7 +527,7 @@ async fn get_storage_hash() { rpc_params![ "invalid_sub_id", &invalid_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }] ], ) .await @@ -542,7 +542,7 @@ async fn get_storage_hash() { rpc_params![ &sub_id, &invalid_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }] ], ) .await @@ -558,7 +558,7 @@ async fn get_storage_hash() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }] ], ) .await @@ -592,7 +592,7 @@ async fn get_storage_hash() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }] ], ) .await @@ -606,14 +606,13 @@ async fn get_storage_hash() { let child_info = hex_string(&CHILD_STORAGE_KEY); let genesis_hash = format!("{:?}", client.genesis_hash()); let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); - println!("Expe: {:?}", expected_hash); let mut sub = api .subscribe( "chainHead_unstable_storage", rpc_params![ &sub_id, &genesis_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Hash }], + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Hash }], &child_info ], ) @@ -625,6 +624,96 @@ async fn get_storage_hash() { assert_matches!(event, ChainHeadStorageEvent::Done); } +#[tokio::test] +async fn get_storage_multi_query_iter() { + let (mut client, api, mut block_sub, sub_id, _) = setup_api().await; + let key = hex_string(&KEY); + + // Import a new block with storage changes. + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push_storage_change(KEY.to_vec(), Some(VALUE.to_vec())).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::<FollowEvent<String>>(&mut block_sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::<FollowEvent<String>>(&mut block_sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // Valid call with storage at the key. + let expected_hash = format!("{:?}", Blake2Hasher::hash(&VALUE)); + let expected_value = hex_string(&VALUE); + let mut sub = api + .subscribe( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &block_hash, + vec![ + StorageQuery { + key: key.clone(), + query_type: StorageQueryType::DescendantsHashes + }, + StorageQuery { + key: key.clone(), + query_type: StorageQueryType::DescendantsValues + } + ] + ], + ) + .await + .unwrap(); + let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadStorageEvent::<String>::Items(res) if res.items.len() == 2 && + res.items[0].key == key && + res.items[1].key == key && + res.items[0].result == StorageResultType::Hash(expected_hash) && + res.items[1].result == StorageResultType::Value(expected_value)); + let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadStorageEvent::Done); + + // Child value set in `setup_api`. + let child_info = hex_string(&CHILD_STORAGE_KEY); + let genesis_hash = format!("{:?}", client.genesis_hash()); + let expected_hash = format!("{:?}", Blake2Hasher::hash(&CHILD_VALUE)); + let expected_value = hex_string(&CHILD_VALUE); + let mut sub = api + .subscribe( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &genesis_hash, + vec![ + StorageQuery { + key: key.clone(), + query_type: StorageQueryType::DescendantsHashes + }, + StorageQuery { + key: key.clone(), + query_type: StorageQueryType::DescendantsValues + } + ], + &child_info + ], + ) + .await + .unwrap(); + let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadStorageEvent::<String>::Items(res) if res.items.len() == 2 && + res.items[0].key == key && + res.items[1].key == key && + res.items[0].result == StorageResultType::Hash(expected_hash) && + res.items[1].result == StorageResultType::Value(expected_value)); + let event: ChainHeadStorageEvent<String> = get_next_event(&mut sub).await; + assert_matches!(event, ChainHeadStorageEvent::Done); +} + #[tokio::test] async fn get_storage_value() { let (mut client, api, mut block_sub, sub_id, block) = setup_api().await; @@ -639,7 +728,7 @@ async fn get_storage_value() { rpc_params![ "invalid_sub_id", &invalid_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }] ], ) .await @@ -654,7 +743,7 @@ async fn get_storage_value() { rpc_params![ &sub_id, &invalid_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }] ], ) .await @@ -670,7 +759,7 @@ async fn get_storage_value() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }] ], ) .await @@ -704,7 +793,7 @@ async fn get_storage_value() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }] ], ) .await @@ -724,7 +813,7 @@ async fn get_storage_value() { rpc_params![ &sub_id, &genesis_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }], + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }], &child_info ], ) @@ -752,7 +841,7 @@ async fn get_storage_wrong_key() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: prefixed_key, queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: prefixed_key, query_type: StorageQueryType::Value }] ], ) .await @@ -770,7 +859,7 @@ async fn get_storage_wrong_key() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: prefixed_key, queue_type: StorageQueryType::Value }] + vec![StorageQuery { key: prefixed_key, query_type: StorageQueryType::Value }] ], ) .await @@ -788,7 +877,7 @@ async fn get_storage_wrong_key() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key: key.clone(), queue_type: StorageQueryType::Value }], + vec![StorageQuery { key: key.clone(), query_type: StorageQueryType::Value }], &prefixed_key ], ) @@ -807,7 +896,7 @@ async fn get_storage_wrong_key() { rpc_params![ &sub_id, &block_hash, - vec![StorageQuery { key, queue_type: StorageQueryType::Value }], + vec![StorageQuery { key, query_type: StorageQueryType::Value }], &prefixed_key ], ) -- GitLab