Newer
Older
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! # Remote Externalities
//!
//! An equivalent of `sp_io::TestExternalities` that can load its state from a remote substrate
//! based chain, or a local state snapshot file.
use codec::{Decode, Encode};
use futures::{channel::mpsc, stream::StreamExt};
use log::*;
use serde::de::DeserializeOwned;
storage::{
well_known_keys::{is_default_child_storage_key, DEFAULT_CHILD_STORAGE_KEY_PREFIX},
ChildInfo, ChildType, PrefixedStorageKey, StorageData, StorageKey,
},
pub use sp_io::TestExternalities;
use sp_runtime::{traits::Block as BlockT, StateVersion};
num::NonZeroUsize,
ops::{Deref, DerefMut},
sync::Arc,
use substrate_rpc_client::{
rpc_params, ws_client, BatchRequestBuilder, ChainApi, ClientT, StateApi, WsClient,
};
type KeyValue = (StorageKey, StorageData);
type TopKeyValues = Vec<KeyValue>;
type ChildKeyValues = Vec<(ChildInfo, Vec<KeyValue>)>;
const LOG_TARGET: &str = "remote-ext";
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
const DEFAULT_WS_ENDPOINT: &str = "wss://rpc.polkadot.io:443";
const DEFAULT_VALUE_DOWNLOAD_BATCH: usize = 4096;
// NOTE: increasing this value does not seem to impact speed all that much.
const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000;
/// The snapshot that we store on disk.
#[derive(Decode, Encode)]
struct Snapshot<B: BlockT> {
state_version: StateVersion,
block_hash: B::Hash,
top: TopKeyValues,
child: ChildKeyValues,
}
/// An externalities that acts exactly the same as [`sp_io::TestExternalities`] but has a few extra
/// bits and pieces to it, and can be loaded remotely.
pub struct RemoteExternalities<B: BlockT> {
/// The inner externalities.
pub inner_ext: TestExternalities,
/// The block hash it which we created this externality env.
pub block_hash: B::Hash,
}
impl<B: BlockT> Deref for RemoteExternalities<B> {
type Target = TestExternalities;
fn deref(&self) -> &Self::Target {
&self.inner_ext
}
}
impl<B: BlockT> DerefMut for RemoteExternalities<B> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner_ext
}
}
/// The execution mode.
#[derive(Clone)]
/// Online. Potentially writes to a snapshot file.
Online(OnlineConfig<B>),
/// Offline. Uses a state snapshot file and needs not any client config.
/// Prefer using a snapshot file if it exists, else use a remote server.
OfflineOrElseOnline(OfflineConfig, OnlineConfig<B>),
impl<B: BlockT> Default for Mode<B> {
fn default() -> Self {
Mode::Online(OnlineConfig::default())
}
}
/// Configuration of the offline execution.
/// A state snapshot config must be present.
#[derive(Clone)]
pub struct OfflineConfig {
/// The configuration of the state snapshot file to use. It must be present.
pub state_snapshot: SnapshotConfig,
/// Description of the transport protocol (for online execution).
#[derive(Debug, Clone)]
pub enum Transport {
/// Use the `URI` to open a new WebSocket connection.
Uri(String),
/// Use existing WebSocket connection.
RemoteClient(Arc<WsClient>),
impl Transport {
fn as_client(&self) -> Option<&WsClient> {
match self {
Self::RemoteClient(client) => Some(client),
_ => None,
}
}
fn as_client_cloned(&self) -> Option<Arc<WsClient>> {
match self {
Self::RemoteClient(client) => Some(client.clone()),
_ => None,
}
}
// Open a new WebSocket connection if it's not connected.
async fn map_uri(&mut self) -> Result<(), &'static str> {
if let Self::Uri(uri) = self {
log::debug!(target: LOG_TARGET, "initializing remote client to {:?}", uri);
Niklas Adolfsson
committed
let ws_client = ws_client(uri).await.map_err(|e| {
log::error!(target: LOG_TARGET, "error: {:?}", e);
"failed to build ws client"
})?;
*self = Self::RemoteClient(Arc::new(ws_client))
}
Ok(())
}
}
impl From<String> for Transport {
fn from(uri: String) -> Self {
Transport::Uri(uri)
}
}
impl From<Arc<WsClient>> for Transport {
fn from(client: Arc<WsClient>) -> Self {
Transport::RemoteClient(client)
/// Configuration of the online execution.
///
/// A state snapshot config may be present and will be written to in that case.
pub struct OnlineConfig<B: BlockT> {
/// The block hash at which to get the runtime state. Will be latest finalized head if not
/// provided.
pub at: Option<B::Hash>,
/// An optional state snapshot file to WRITE to, not for reading. Not written if set to `None`.
pub state_snapshot: Option<SnapshotConfig>,
/// The pallets to scrape. These values are hashed and added to `hashed_prefix`.
Kian Paimani
committed
pub pallets: Vec<String>,
/// Transport config.
pub transport: Transport,
/// Lookout for child-keys, and scrape them as well if set to true.
pub child_trie: bool,
/// Storage entry key prefixes to be injected into the externalities. The *hashed* prefix must
/// be given.
pub hashed_prefixes: Vec<Vec<u8>>,
/// Storage entry keys to be injected into the externalities. The *hashed* key must be given.
pub hashed_keys: Vec<Vec<u8>>,
impl<B: BlockT> OnlineConfig<B> {
/// Return rpc (ws) client reference.
fn rpc_client(&self) -> &WsClient {
.as_client()
.expect("ws client must have been initialized by now; qed.")
/// Return a cloned rpc (ws) client, suitable for being moved to threads.
fn rpc_client_cloned(&self) -> Arc<WsClient> {
self.transport
.as_client_cloned()
.expect("ws client must have been initialized by now; qed.")
}
fn at_expected(&self) -> B::Hash {
self.at.expect("block at must be initialized; qed")
}
impl<B: BlockT> Default for OnlineConfig<B> {
transport: Transport::from(DEFAULT_WS_ENDPOINT.to_owned()),
child_trie: true,
at: None,
state_snapshot: None,
pallets: Default::default(),
hashed_keys: Default::default(),
hashed_prefixes: Default::default(),
impl<B: BlockT> From<String> for OnlineConfig<B> {
fn from(t: String) -> Self {
Self { transport: t.into(), ..Default::default() }
/// Configuration of the state snapshot.
/// The path to the snapshot file.
pub path: PathBuf,
impl SnapshotConfig {
pub fn new<P: Into<PathBuf>>(path: P) -> Self {
Self { path: path.into() }
impl From<String> for SnapshotConfig {
fn from(s: String) -> Self {
Self::new(s)
}
}
impl Default for SnapshotConfig {
fn default() -> Self {
Self { path: Path::new("SNAPSHOT").into() }
}
}
/// Builder for remote-externalities.
/// Custom key-pairs to be injected into the final externalities. The *hashed* keys and values
/// must be given.
hashed_key_values: Vec<KeyValue>,
/// The keys that will be excluded from the final externality. The *hashed* key must be given.
hashed_blacklist: Vec<Vec<u8>>,
/// Connectivity mode, online or offline.
/// If provided, overwrite the state version with this. Otherwise, the state_version of the
/// remote node is used. All cache files also store their state version.
///
/// Overwrite only with care.
overwrite_state_version: Option<StateVersion>,
// NOTE: ideally we would use `DefaultNoBound` here, but not worth bringing in frame-support for
// that.
Niklas Adolfsson
committed
impl<B: BlockT> Default for Builder<B> {
Self {
mode: Default::default(),
hashed_key_values: Default::default(),
hashed_blacklist: Default::default(),
}
}
// Mode methods
Niklas Adolfsson
committed
impl<B: BlockT> Builder<B> {
fn as_online(&self) -> &OnlineConfig<B> {
Mode::Online(config) => config,
Mode::OfflineOrElseOnline(_, config) => config,
_ => panic!("Unexpected mode: Online"),
}
}
fn as_online_mut(&mut self) -> &mut OnlineConfig<B> {
match &mut self.mode {
Mode::Online(config) => config,
Mode::OfflineOrElseOnline(_, config) => config,
_ => panic!("Unexpected mode: Online"),
}
}
}
// RPC methods
Niklas Adolfsson
committed
impl<B: BlockT> Builder<B>
where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
/// Get the number of threads to use.
fn threads() -> NonZeroUsize {
thread::available_parallelism()
.unwrap_or(NonZeroUsize::new(4usize).expect("4 is non-zero; qed"))
}
async fn rpc_get_storage(
&self,
key: StorageKey,
maybe_at: Option<B::Hash>,
) -> Result<Option<StorageData>, &'static str> {
trace!(target: LOG_TARGET, "rpc: get_storage");
self.as_online().rpc_client().storage(key, maybe_at).await.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_storage failed."
})
/// Get the latest finalized head.
async fn rpc_get_head(&self) -> Result<B::Hash, &'static str> {
trace!(target: LOG_TARGET, "rpc: finalized_head");
Niklas Adolfsson
committed
// sadly this pretty much unreadable...
ChainApi::<(), _, B::Header, ()>::finalized_head(self.as_online().rpc_client())
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc finalized_head failed."
})
/// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods.
async fn rpc_get_keys_paged(
&self,
prefix: StorageKey,
) -> Result<Vec<StorageKey>, &'static str> {
let mut last_key: Option<StorageKey> = None;
let mut all_keys: Vec<StorageKey> = vec![];
let keys = loop {
let page = self
.as_online()
.rpc_client()
.storage_keys_paged(
Some(prefix.clone()),
DEFAULT_KEY_DOWNLOAD_PAGE,
last_key.clone(),
Some(at),
)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc get_keys failed"
})?;
let page_len = page.len();
all_keys.extend(page);
if page_len < DEFAULT_KEY_DOWNLOAD_PAGE as usize {
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
} else {
let new_last_key =
all_keys.last().expect("all_keys is populated; has .last(); qed");
"new total = {}, full page received: {}",
all_keys.len(),
HexDisplay::from(new_last_key)
);
last_key = Some(new_last_key.clone());
}
};
Ok(keys)
}
/// Synonym of `getPairs` that uses paged queries to first get the keys, and then
/// map them to values one by one.
/// This can work with public nodes. But, expect it to be darn slow.
pub(crate) async fn rpc_get_pairs_paged(
&self,
prefix: StorageKey,
pending_ext: &mut TestExternalities,
) -> Result<Vec<KeyValue>, &'static str> {
let keys = self.rpc_get_keys_paged(prefix.clone(), at).await?;
if keys.is_empty() {
return Ok(Default::default())
}
let client = self.as_online().rpc_client_cloned();
let threads = Self::threads().get();
let thread_chunk_size = (keys.len() + threads - 1) / threads;
log::info!(
target: LOG_TARGET,
"Querying a total of {} keys from prefix {:?}, splitting among {} threads, {} keys per thread",
keys.len(),
HexDisplay::from(&prefix),
threads,
thread_chunk_size,
);
let mut handles = Vec::new();
let keys_chunked: Vec<Vec<StorageKey>> =
keys.chunks(thread_chunk_size).map(|s| s.into()).collect::<Vec<_>>();
enum Message {
/// This thread completed the assigned work.
Terminated,
/// The thread produced the following batch response.
Batch(Vec<(Vec<u8>, Vec<u8>)>),
/// A request from the batch failed.
BatchFailed(String),
}
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
let (tx, mut rx) = mpsc::unbounded::<Message>();
for thread_keys in keys_chunked {
let thread_client = client.clone();
let thread_sender = tx.clone();
let handle = std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut thread_key_values = Vec::with_capacity(thread_keys.len());
for chunk_keys in thread_keys.chunks(DEFAULT_VALUE_DOWNLOAD_BATCH) {
let mut batch = BatchRequestBuilder::new();
for key in chunk_keys.iter() {
batch
.insert("state_getStorage", rpc_params![key, at])
.map_err(|_| "Invalid batch params")
.unwrap();
}
let batch_response = rt
.block_on(thread_client.batch_request::<Option<StorageData>>(batch))
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"failed to execute batch: {:?}. Error: {:?}",
chunk_keys.iter().map(HexDisplay::from).collect::<Vec<_>>(),
e
);
"batch failed."
})
.unwrap();
// Check if we got responses for all submitted requests.
assert_eq!(chunk_keys.len(), batch_response.len());
let mut batch_kv = Vec::with_capacity(chunk_keys.len());
for (key, maybe_value) in chunk_keys.into_iter().zip(batch_response) {
match maybe_value {
Ok(Some(data)) => {
thread_key_values.push((key.clone(), data.clone()));
batch_kv.push((key.clone().0, data.0));
},
Ok(None) => {
log::warn!(
target: LOG_TARGET,
"key {:?} had none corresponding value.",
&key
);
let data = StorageData(vec![]);
thread_key_values.push((key.clone(), data.clone()));
batch_kv.push((key.clone().0, data.0));
},
Err(e) => {
let reason = format!("key {:?} failed: {:?}", &key, e);
log::error!(target: LOG_TARGET, "Reason: {}", reason);
// Signal failures to the main thread, stop aggregating (key, value)
// pairs and return immediately an error.
thread_sender.unbounded_send(Message::BatchFailed(reason)).unwrap();
return Default::default()
},
};
if thread_key_values.len() % (thread_keys.len() / 10).max(1) == 0 {
let ratio: f64 =
thread_key_values.len() as f64 / thread_keys.len() as f64;
log::debug!(
target: LOG_TARGET,
"[thread = {:?}] progress = {:.2} [{} / {}]",
std::thread::current().id(),
ratio,
thread_key_values.len(),
thread_keys.len(),
);
}
}
// Send this batch to the main thread to start inserting.
thread_sender.unbounded_send(Message::Batch(batch_kv)).unwrap();
}
thread_sender.unbounded_send(Message::Terminated).unwrap();
thread_key_values
});
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
// first, wait until all threads send a `Terminated` message, in the meantime populate
// `pending_ext`.
let mut terminated = 0usize;
let mut batch_failed = false;
loop {
match rx.next().await.unwrap() {
Message::Batch(kv) => {
for (k, v) in kv {
// skip writing the child root data.
if is_default_child_storage_key(k.as_ref()) {
continue
}
pending_ext.insert(k, v);
}
},
Message::BatchFailed(error) => {
log::error!(target: LOG_TARGET, "Batch processing failed: {:?}", error);
batch_failed = true;
break
},
Message::Terminated => {
terminated += 1;
if terminated == handles.len() {
break
}
},
// Ensure all threads finished execution before returning.
let keys_and_values =
handles.into_iter().flat_map(|h| h.join().unwrap()).collect::<Vec<_>>();
if batch_failed {
return Err("Batch failed.")
/// Get the values corresponding to `child_keys` at the given `prefixed_top_key`.
pub(crate) async fn rpc_child_get_storage_paged(
prefixed_top_key: &StorageKey,
child_keys: Vec<StorageKey>,
at: B::Hash,
) -> Result<Vec<KeyValue>, &'static str> {
let mut child_kv_inner = vec![];
let mut batch_success = true;
for batch_child_key in child_keys.chunks(DEFAULT_VALUE_DOWNLOAD_BATCH) {
let mut batch_request = BatchRequestBuilder::new();
for key in batch_child_key {
batch_request
.insert(
"childstate_getStorage",
rpc_params![
PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
key,
at
],
)
.map_err(|_| "Invalid batch params")?;
}
let batch_response =
client.batch_request::<Option<StorageData>>(batch_request).await.map_err(|e| {
log::error!(
target: LOG_TARGET,
"failed to execute batch: {:?}. Error: {:?}",
batch_child_key,
e
);
"batch failed."
})?;
assert_eq!(batch_child_key.len(), batch_response.len());
for (key, maybe_value) in batch_child_key.iter().zip(batch_response) {
match maybe_value {
Ok(Some(v)) => {
child_kv_inner.push((key.clone(), v));
},
Ok(None) => {
log::warn!(
target: LOG_TARGET,
"key {:?} had none corresponding value.",
&key
);
child_kv_inner.push((key.clone(), StorageData(vec![])));
},
Err(e) => {
log::error!(target: LOG_TARGET, "key {:?} failed: {:?}", &key, e);
batch_success = false;
},
};
}
}
if batch_success {
Ok(child_kv_inner)
} else {
Err("batch failed.")
}
}
pub(crate) async fn rpc_child_get_keys(
prefixed_top_key: &StorageKey,
child_prefix: StorageKey,
at: B::Hash,
) -> Result<Vec<StorageKey>, &'static str> {
Niklas Adolfsson
committed
// This is deprecated and will generate a warning which causes the CI to fail.
#[allow(warnings)]
let child_keys = substrate_rpc_client::ChildStateApi::storage_keys(
Niklas Adolfsson
committed
PrefixedStorageKey::new(prefixed_top_key.as_ref().to_vec()),
child_prefix,
Some(at),
)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc child_get_keys failed."
})?;
debug!(
target: LOG_TARGET,
"[thread = {:?}] scraped {} child-keys of the child-bearing top key: {}",
std::thread::current().id(),
child_keys.len(),
HexDisplay::from(prefixed_top_key)
);
Ok(child_keys)
}
impl<B: BlockT + DeserializeOwned> Builder<B>
Niklas Adolfsson
committed
where
B::Hash: DeserializeOwned,
B::Header: DeserializeOwned,
{
/// Load all of the child keys from the remote config, given the already scraped list of top key
/// pairs.
///
/// `top_kv` need not be only child-bearing top keys. It should be all of the top keys that are
/// included thus far.
///
/// This function concurrently populates `pending_ext`. the return value is only for writing to
/// cache, we can also optimize further.
async fn load_child_remote(
&self,
top_kv: &[KeyValue],
pending_ext: &mut TestExternalities,
) -> Result<ChildKeyValues, &'static str> {
let child_roots = top_kv
.into_iter()
.filter_map(|(k, _)| is_default_child_storage_key(k.as_ref()).then(|| k.clone()))
.collect::<Vec<_>>();
if child_roots.is_empty() {
return Ok(Default::default())
}
// div-ceil simulation.
let threads = Self::threads().get();
let child_roots_per_thread = (child_roots.len() + threads - 1) / threads;
info!(
target: LOG_TARGET,
"👩👦 scraping child-tree data from {} top keys, split among {} threads, {} top keys per thread",
child_roots.len(),
threads,
child_roots_per_thread,
);
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
// NOTE: the threading done here is the simpler, yet slightly un-elegant because we are
// splitting child root among threads, and it is very common for these root to have vastly
// different child tries underneath them, causing some threads to finish way faster than
// others. Certainly still better than single thread though.
let mut handles = vec![];
let client = self.as_online().rpc_client_cloned();
let at = self.as_online().at_expected();
enum Message {
Terminated,
Batch((ChildInfo, Vec<(Vec<u8>, Vec<u8>)>)),
}
let (tx, mut rx) = mpsc::unbounded::<Message>();
for thread_child_roots in child_roots
.chunks(child_roots_per_thread)
.map(|x| x.into())
.collect::<Vec<Vec<_>>>()
{
let thread_client = client.clone();
let thread_sender = tx.clone();
let handle = thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut thread_child_kv = vec![];
for prefixed_top_key in thread_child_roots {
let child_keys = rt.block_on(Self::rpc_child_get_keys(
&thread_client,
&prefixed_top_key,
StorageKey(vec![]),
at,
))?;
let child_kv_inner = rt.block_on(Self::rpc_child_get_storage_paged(
&thread_client,
&prefixed_top_key,
child_keys,
at,
))?;
let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
let un_prefixed = match ChildType::from_prefixed_key(&prefixed_top_key) {
Some((ChildType::ParentKeyId, storage_key)) => storage_key,
None => {
log::error!(target: LOG_TARGET, "invalid key: {:?}", prefixed_top_key);
return Err("Invalid child key")
},
};
thread_sender
.unbounded_send(Message::Batch((
ChildInfo::new_default(un_prefixed),
child_kv_inner
.iter()
.cloned()
.map(|(k, v)| (k.0, v.0))
.collect::<Vec<_>>(),
)))
.unwrap();
thread_child_kv.push((ChildInfo::new_default(un_prefixed), child_kv_inner));
}
thread_sender.unbounded_send(Message::Terminated).unwrap();
Ok(thread_child_kv)
});
handles.push(handle);
}
// first, wait until all threads send a `Terminated` message, in the meantime populate
// `pending_ext`.
let mut terminated = 0usize;
loop {
match rx.next().await.unwrap() {
Message::Batch((info, kvs)) =>
for (k, v) in kvs {
pending_ext.insert_child(info.clone(), k, v);
},
Message::Terminated => {
terminated += 1;
if terminated == handles.len() {
break
}
},
}
}
let child_kv = handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.flatten()
.collect::<Vec<_>>();
Ok(child_kv)
}
/// Build `Self` from a network node denoted by `uri`.
///
/// This function concurrently populates `pending_ext`. the return value is only for writing to
/// cache, we can also optimize further.
async fn load_top_remote(
&self,
pending_ext: &mut TestExternalities,
) -> Result<TopKeyValues, &'static str> {
let at = self
.as_online()
.at
.expect("online config must be initialized by this point; qed.");
log::info!(target: LOG_TARGET, "scraping key-pairs from remote at block height {:?}", at);
let mut keys_and_values = Vec::new();
for prefix in &config.hashed_prefixes {
let now = std::time::Instant::now();
let additional_key_values =
self.rpc_get_pairs_paged(StorageKey(prefix.to_vec()), at, pending_ext).await?;
let elapsed = now.elapsed();
"adding data for hashed prefix: {:?}, took {:?}s",
HexDisplay::from(prefix),
elapsed.as_secs()
keys_and_values.extend(additional_key_values);
}
for key in &config.hashed_keys {
let key = StorageKey(key.to_vec());
log::info!(
target: LOG_TARGET,
"adding data for hashed key: {:?}",
HexDisplay::from(&key)
);
match self.rpc_get_storage(key.clone(), Some(at)).await? {
Some(value) => {
pending_ext.insert(key.clone().0, value.clone().0);
keys_and_values.push((key, value));
},
None => {
log::warn!(
target: LOG_TARGET,
"no data found for hashed key: {:?}",
HexDisplay::from(&key)
);
},
}
Ok(keys_and_values)
/// The entry point of execution, if `mode` is online.
///
/// initializes the remote client in `transport`, and sets the `at` field, if not specified.
async fn init_remote_client(&mut self) -> Result<(), &'static str> {
// First, initialize the ws client.
self.as_online_mut().transport.map_uri().await?;
// Then, if `at` is not set, set it.
if self.as_online().at.is_none() {
let at = self.rpc_get_head().await?;
log::info!(
target: LOG_TARGET,
"since no at is provided, setting it to latest finalized head, {:?}",
at
);
self.as_online_mut().at = Some(at);
}
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
// Then, a few transformation that we want to perform in the online config:
let online_config = self.as_online_mut();
online_config
.pallets
.iter()
.for_each(|p| online_config.hashed_prefixes.push(twox_128(p.as_bytes()).to_vec()));
if online_config.child_trie {
online_config.hashed_prefixes.push(DEFAULT_CHILD_STORAGE_KEY_PREFIX.to_vec());
}
// Finally, if by now, we have put any limitations on prefixes that we are interested in, we
// download everything.
if online_config
.hashed_prefixes
.iter()
.filter(|p| *p != DEFAULT_CHILD_STORAGE_KEY_PREFIX)
.count() == 0
{
log::info!(
target: LOG_TARGET,
"since no prefix is filtered, the data for all pallets will be downloaded"
);
online_config.hashed_prefixes.push(vec![]);
}
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
/// Load the data from a remote server. The main code path is calling into `load_top_remote` and
/// `load_child_remote`.
///
/// Must be called after `init_remote_client`.
async fn load_remote_and_maybe_save(&mut self) -> Result<TestExternalities, &'static str> {
let state_version =
StateApi::<B::Hash>::runtime_version(self.as_online().rpc_client(), None)
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error = {:?}", e);
"rpc runtime_version failed."
})
.map(|v| v.state_version())?;
let mut pending_ext = TestExternalities::new_with_code_and_state(
Default::default(),
Default::default(),
self.overwrite_state_version.unwrap_or(state_version),
);
let top_kv = self.load_top_remote(&mut pending_ext).await?;
let child_kv = self.load_child_remote(&top_kv, &mut pending_ext).await?;
if let Some(path) = self.as_online().state_snapshot.clone().map(|c| c.path) {
let snapshot = Snapshot::<B> {
state_version,
top: top_kv,
child: child_kv,
block_hash: self
.as_online()
.at
.expect("set to `Some` in `init_remote_client`; must be called before; qed"),
};
let encoded = snapshot.encode();
log::info!(
target: LOG_TARGET,
"writing snapshot of {} bytes to {:?}",
encoded.len(),
path
);
std::fs::write(path, encoded).map_err(|_| "fs::write failed")?;
}
Ok(pending_ext)
}
fn load_snapshot(&mut self, path: PathBuf) -> Result<Snapshot<B>, &'static str> {
info!(target: LOG_TARGET, "loading data from snapshot {:?}", path);
let bytes = fs::read(path).map_err(|_| "fs::read failed.")?;
Decode::decode(&mut &*bytes).map_err(|_| "decode failed")
}
async fn do_load_remote(&mut self) -> Result<RemoteExternalities<B>, &'static str> {
self.init_remote_client().await?;
let block_hash = self.as_online().at_expected();
let inner_ext = self.load_remote_and_maybe_save().await?;
Ok(RemoteExternalities { block_hash, inner_ext })
}
fn do_load_offline(
&mut self,
config: OfflineConfig,
) -> Result<RemoteExternalities<B>, &'static str> {
let Snapshot { block_hash, top, child, state_version } =
self.load_snapshot(config.state_snapshot.path.clone())?;
let mut inner_ext = TestExternalities::new_with_code_and_state(
Default::default(),
Default::default(),
self.overwrite_state_version.unwrap_or(state_version),
);
info!(target: LOG_TARGET, "injecting a total of {} top keys", top.len());
for (k, v) in top {
// skip writing the child root data.
if is_default_child_storage_key(k.as_ref()) {
continue
}
inner_ext.insert(k.0, v.0);
}
info!(
target: LOG_TARGET,
"injecting a total of {} child keys",
child.iter().flat_map(|(_, kv)| kv).count()
);
for (info, key_values) in child {
for (k, v) in key_values {
inner_ext.insert_child(info.clone(), k.0, v.0);
}
}
Ok(RemoteExternalities { inner_ext, block_hash })
}
pub(crate) async fn pre_build(mut self) -> Result<RemoteExternalities<B>, &'static str> {
let mut ext = match self.mode.clone() {
Mode::Offline(config) => self.do_load_offline(config)?,
Mode::Online(_) => self.do_load_remote().await?,
Mode::OfflineOrElseOnline(offline_config, _) => {
match self.do_load_offline(offline_config) {
Ok(x) => x,
Err(_) => self.do_load_remote().await?,
// inject manual key values.
if !self.hashed_key_values.is_empty() {
target: LOG_TARGET,
"extending externalities with {} manually injected key-values",
self.hashed_key_values.len()
);
for (k, v) in self.hashed_key_values {