Newer
Older
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID};
use crate::{SpawnTaskHandle, start_rpc_servers, build_network_future, TransactionPoolAdapter};
use crate::status_sinks;
use crate::config::{Configuration, DatabaseConfig};
use client_api::{
self,
BlockchainEvents,
backend::RemoteBackend, light::RemoteBlockchain,
use client::Client;
use chain_spec::{RuntimeGenesis, Extension};
use consensus_common::import_queue::ImportQueue;
use futures::{prelude::*, sync::mpsc};
use futures03::{
FutureExt as _, TryFutureExt as _,
StreamExt as _, TryStreamExt as _,
};
use log::{info, warn, error};
use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent};
use network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization};
use parking_lot::{Mutex, RwLock};
use primitives::{Blake2Hasher, H256, Hasher};
use sr_api::ConstructRuntimeApi;
use sr_primitives::traits::{
Block as BlockT, ProvideRuntimeApi, NumberFor, Header, SaturatedConversion,
use substrate_executor::{NativeExecutor, NativeExecutionDispatch};
use std::{
io::{Read, Write, Seek},
marker::PhantomData, sync::Arc, time::SystemTime
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use tel::{telemetry, SUBSTRATE_INFO};
use txpool_api::{TransactionPool, TransactionPoolMaintainer};
Benjamin Kampmann
committed
use sp_blockchain;
/// Aggregator for the components required to build a service.
///
/// # Usage
///
/// Call [`ServiceBuilder::new_full`] or [`ServiceBuilder::new_light`], then call the various
/// `with_` methods to add the required components that you built yourself:
///
/// - [`with_select_chain`](ServiceBuilder::with_select_chain)
/// - [`with_import_queue`](ServiceBuilder::with_import_queue)
/// - [`with_network_protocol`](ServiceBuilder::with_network_protocol)
/// - [`with_finality_proof_provider`](ServiceBuilder::with_finality_proof_provider)
/// - [`with_transaction_pool`](ServiceBuilder::with_transaction_pool)
///
/// After this is done, call [`build`](ServiceBuilder::build) to construct the service.
///
/// The order in which the `with_*` methods are called doesn't matter, as the correct binding of
/// generics is done when you call `build`.
///
pub struct ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
keystore: Arc<RwLock<Keystore>>,
fetcher: Option<TFchr>,
select_chain: Option<TSc>,
finality_proof_request_builder: Option<TFprb>,
finality_proof_provider: Option<TFpp>,
network_protocol: TNetP,
transaction_pool: Arc<TExPool>,
rpc_extensions: TRpc,
remote_backend: Option<Arc<dyn RemoteBlockchain<TBl>>>,
dht_event_tx: Option<mpsc::Sender<DhtEvent>>,
marker: PhantomData<(TBl, TRtApi)>,
}
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/// Full client type.
type TFullClient<TBl, TRtApi, TExecDisp> = Client<
TFullBackend<TBl>,
TFullCallExecutor<TBl, TExecDisp>,
TBl,
TRtApi,
>;
/// Full client backend type.
type TFullBackend<TBl> = client_db::Backend<TBl>;
/// Full client call executor type.
type TFullCallExecutor<TBl, TExecDisp> = client::LocalCallExecutor<
client_db::Backend<TBl>,
NativeExecutor<TExecDisp>,
>;
/// Light client type.
type TLightClient<TBl, TRtApi, TExecDisp> = Client<
TLightBackend<TBl>,
TLightCallExecutor<TBl, TExecDisp>,
TBl,
TRtApi,
>;
/// Light client backend type.
type TLightBackend<TBl> = client::light::backend::Backend<
client_db::light::LightStorage<TBl>,
Blake2Hasher,
>;
/// Light call executor type.
Svyatoslav Nikolsky
committed
type TLightCallExecutor<TBl, TExecDisp> = client::light::call_executor::GenesisCallExecutor<
client::light::backend::Backend<
client_db::light::LightStorage<TBl>,
Blake2Hasher
>,
client::LocalCallExecutor<
client::light::backend::Backend<
client_db::light::LightStorage<TBl>,
Blake2Hasher
>,
NativeExecutor<TExecDisp>
>,
>;
impl<TCfg, TGen, TCSExt> ServiceBuilder<(), (), TCfg, TGen, TCSExt, (), (), (), (), (), (), (), (), (), ()>
where TGen: RuntimeGenesis, TCSExt: Extension {
/// Start the service builder with a configuration.
pub fn new_full<TBl: BlockT<Hash=H256>, TRtApi, TExecDisp: NativeExecutionDispatch>(
) -> Result<ServiceBuilder<
TBl,
TRtApi,
TCfg,
TGen,
TFullClient<TBl, TRtApi, TExecDisp>,
Arc<OnDemand<TBl>>,
(),
(),
BoxFinalityProofRequestBuilder<TBl>,
Bastian Köcher
committed
Arc<dyn FinalityProofProvider<TBl>>,
Benjamin Kampmann
committed
let keystore = Keystore::open(
config.keystore_path.clone().ok_or("No basepath configured")?,
config.keystore_password.clone()
)?;
let executor = NativeExecutor::<TExecDisp>::new(
config.wasm_method,
config.default_heap_pages,
);
let fork_blocks = config.chain_spec
.extensions()
.get::<client::ForkBlocks<TBl>>()
.cloned()
.unwrap_or_default();
let (client, backend) = {
let db_config = client_db::DatabaseSettings {
state_cache_size: config.state_cache_size,
state_cache_child_ratio:
config.state_cache_child_ratio.map(|v| (v, 100)),
pruning: config.pruning.clone(),
source: match &config.database {
DatabaseConfig::Path { path, cache_size } =>
client_db::DatabaseSettingsSrc::Path {
path: path.clone(),
cache_size: cache_size.clone().map(|u| u as usize),
},
DatabaseConfig::Custom(db) =>
client_db::DatabaseSettingsSrc::Custom(db.clone()),
},
};
let extensions = client_api::execution_extensions::ExecutionExtensions::new(
config.execution_strategies.clone(),
Some(keystore.clone()),
);
client_db::new_client(
db_config,
executor,
&config.chain_spec,
fork_blocks,
)?
};
Ok(ServiceBuilder {
config,
client,
keystore,
fetcher: None,
select_chain: None,
import_queue: (),
finality_proof_request_builder: None,
finality_proof_provider: None,
network_protocol: (),
transaction_pool: Arc::new(()),
rpc_extensions: Default::default(),
dht_event_tx: None,
marker: PhantomData,
})
}
/// Start the service builder with a configuration.
pub fn new_light<TBl: BlockT<Hash=H256>, TRtApi, TExecDisp: NativeExecutionDispatch + 'static>(
) -> Result<ServiceBuilder<
TBl,
TRtApi,
TCfg,
TGen,
TLightClient<TBl, TRtApi, TExecDisp>,
Arc<OnDemand<TBl>>,
(),
(),
BoxFinalityProofRequestBuilder<TBl>,
Bastian Köcher
committed
Arc<dyn FinalityProofProvider<TBl>>,
Benjamin Kampmann
committed
let keystore = Keystore::open(
config.keystore_path.clone().ok_or("No basepath configured")?,
config.keystore_password.clone()
)?;
let executor = NativeExecutor::<TExecDisp>::new(
config.wasm_method,
config.default_heap_pages,
);
let db_storage = {
let db_settings = client_db::DatabaseSettings {
state_cache_size: config.state_cache_size,
state_cache_child_ratio:
config.state_cache_child_ratio.map(|v| (v, 100)),
pruning: config.pruning.clone(),
source: match &config.database {
DatabaseConfig::Path { path, cache_size } =>
client_db::DatabaseSettingsSrc::Path {
path: path.clone(),
cache_size: cache_size.clone().map(|u| u as usize),
},
DatabaseConfig::Custom(db) =>
client_db::DatabaseSettingsSrc::Custom(db.clone()),
},
};
client_db::light::LightStorage::new(db_settings)?
};
let light_blockchain = client::light::new_light_blockchain(db_storage);
let fetch_checker = Arc::new(client::light::new_fetch_checker(light_blockchain.clone(), executor.clone()));
let fetcher = Arc::new(network::OnDemand::new(fetch_checker));
Svyatoslav Nikolsky
committed
let backend = client::light::new_light_backend(light_blockchain);
let remote_blockchain = backend.remote_blockchain();
let client = Arc::new(client::light::new_light(
backend.clone(),
&config.chain_spec,
executor,
)?);
fetcher: Some(fetcher.clone()),
select_chain: None,
import_queue: (),
finality_proof_request_builder: None,
finality_proof_provider: None,
network_protocol: (),
transaction_pool: Arc::new(()),
rpc_extensions: Default::default(),
remote_backend: Some(remote_blockchain),
dht_event_tx: None,
marker: PhantomData,
})
}
}
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, Backend>
ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
/// Returns a reference to the client that was stored in this builder.
pub fn client(&self) -> &Arc<TCl> {
&self.client
}
/// Returns a reference to the backend that was used in this builder.
pub fn backend(&self) -> &Arc<Backend> {
&self.backend
}
/// Returns a reference to the select-chain that was stored in this builder.
pub fn select_chain(&self) -> Option<&TSc> {
self.select_chain.as_ref()
}
/// Defines which head-of-chain strategy to use.
pub fn with_opt_select_chain<USc>(
select_chain_builder: impl FnOnce(
&Configuration<TCfg, TGen, TCSExt>, &Arc<Backend>
) -> Result<Option<USc>, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, USc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error> {
let select_chain = select_chain_builder(&self.config, &self.backend)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines which head-of-chain strategy to use.
pub fn with_select_chain<USc>(
self,
builder: impl FnOnce(&Configuration<TCfg, TGen, TCSExt>, &Arc<Backend>) -> Result<USc, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, USc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error> {
self.with_opt_select_chain(|cfg, b| builder(cfg, b).map(Option::Some))
}
/// Defines which import queue to use.
pub fn with_import_queue<UImpQu>(
builder: impl FnOnce(&Configuration<TCfg, TGen, TCSExt>, Arc<TCl>, Option<TSc>, Arc<TExPool>)
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, UImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error>
where TSc: Clone {
let import_queue = builder(
self.client.clone(),
self.select_chain.clone(),
self.transaction_pool.clone()
)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines which network specialization protocol to use.
pub fn with_network_protocol<UNetP>(
self,
network_protocol_builder: impl FnOnce(&Configuration<TCfg, TGen, TCSExt>) -> Result<UNetP, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
UNetP, TExPool, TRpc, Backend>, Error> {
let network_protocol = network_protocol_builder(&self.config)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines which strategy to use for providing finality proofs.
pub fn with_opt_finality_proof_provider(
self,
builder: impl FnOnce(Arc<TCl>, Arc<Backend>) -> Result<Option<Arc<dyn FinalityProofProvider<TBl>>>, Error>
) -> Result<ServiceBuilder<
TBl,
TRtApi,
TCfg,
TGen,
TCl,
TFchr,
TSc,
TImpQu,
TFprb,
Arc<dyn FinalityProofProvider<TBl>>,
let finality_proof_provider = builder(self.client.clone(), self.backend.clone())?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines which strategy to use for providing finality proofs.
pub fn with_finality_proof_provider(
self,
build: impl FnOnce(Arc<TCl>, Arc<Backend>) -> Result<Arc<dyn FinalityProofProvider<TBl>>, Error>
) -> Result<ServiceBuilder<
TBl,
TRtApi,
TCfg,
TGen,
TCl,
TFchr,
TSc,
TImpQu,
TFprb,
Arc<dyn FinalityProofProvider<TBl>>,
self.with_opt_finality_proof_provider(|client, backend| build(client, backend).map(Option::Some))
}
/// Defines which import queue to use.
pub fn with_import_queue_and_opt_fprb<UImpQu, UFprb>(
Svyatoslav Nikolsky
committed
builder: impl FnOnce(
Svyatoslav Nikolsky
committed
Arc<TCl>,
Arc<Backend>,
Option<TFchr>,
Option<TSc>,
Arc<TExPool>,
) -> Result<(UImpQu, Option<UFprb>), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, UImpQu, UFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error>
Svyatoslav Nikolsky
committed
where TSc: Clone, TFchr: Clone {
Svyatoslav Nikolsky
committed
self.fetcher.clone(),
self.select_chain.clone(),
self.transaction_pool.clone()
)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue,
finality_proof_request_builder: fprb,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines which import queue to use.
pub fn with_import_queue_and_fprb<UImpQu, UFprb>(
self,
Svyatoslav Nikolsky
committed
builder: impl FnOnce(
Svyatoslav Nikolsky
committed
Arc<TCl>,
Arc<Backend>,
Option<TFchr>,
Option<TSc>,
Arc<TExPool>,
) -> Result<(UImpQu, UFprb), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, UImpQu, UFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error>
Svyatoslav Nikolsky
committed
where TSc: Clone, TFchr: Clone {
self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx|
builder(cfg, cl, b, f, sc, tx)
.map(|(q, f)| (q, Some(f)))
)
}
/// Defines which transaction pool to use.
pub fn with_transaction_pool<UExPool>(
self,
transaction_pool_builder: impl FnOnce(
txpool::txpool::Options,
Arc<TCl>,
Option<TFchr>,
) -> Result<UExPool, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, UExPool, TRpc, Backend>, Error>
where TSc: Clone, TFchr: Clone {
let transaction_pool = transaction_pool_builder(
self.config.transaction_pool.clone(),
self.client.clone(),
self.fetcher.clone(),
)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: Arc::new(transaction_pool),
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines the RPC extensions to use.
pub fn with_rpc_extensions<URpc>(
self,
rpc_ext_builder: impl FnOnce(
Arc<TCl>,
Arc<TExPool>,
Arc<Backend>,
Option<TFchr>,
Option<Arc<dyn RemoteBlockchain<TBl>>>,
) -> Result<URpc, Error>,
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, URpc, Backend>, Error>
where TSc: Clone, TFchr: Clone {
let rpc_extensions = rpc_ext_builder(
self.client.clone(),
self.transaction_pool.clone(),
self.backend.clone(),
self.fetcher.clone(),
self.remote_backend.clone(),
)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions,
dht_event_tx: self.dht_event_tx,
/// Adds a dht event sender to builder to be used by the network to send dht events to the authority discovery
/// module.
pub fn with_dht_event_tx(
self,
dht_event_tx: mpsc::Sender<DhtEvent>,
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error> {
Ok(ServiceBuilder {
config: self.config,
client: self.client,
backend: self.backend,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
remote_backend: self.remote_backend,
dht_event_tx: Some(dht_event_tx),
marker: self.marker,
})
/// Implemented on `ServiceBuilder`. Allows running block commands, such as import/export/validate
pub trait ServiceBuilderCommand {
/// Block type this API operates on.
type Block: BlockT;
/// Starts the process of importing blocks.
fn import_blocks(
self,
input: impl Read + Seek + Send + 'static,
) -> Box<dyn Future<Item = (), Error = Error> + Send>;
/// Performs the blocks export.
fn export_blocks(
self,
output: impl Write + 'static,
from: NumberFor<Self::Block>,
to: Option<NumberFor<Self::Block>>,
json: bool
) -> Box<dyn Future<Item = (), Error = Error>>;
/// Performs a revert of `blocks` blocks.
fn revert_chain(
&self,
blocks: NumberFor<Self::Block>
) -> Result<(), Error>;
/// Re-validate known block.
fn check_block(
block: BlockId<Self::Block>
) -> Box<dyn Future<Item = (), Error = Error> + Send>;
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TSc, TImpQu, TNetP, TExPool, TRpc>
ServiceBuilder<
TBl,
TRtApi,
TCfg,
TGen,
Client<TBackend, TExec, TBl, TRtApi>,
Arc<OnDemand<TBl>>,
TSc,
TImpQu,
BoxFinalityProofRequestBuilder<TBl>,
Arc<dyn FinalityProofProvider<TBl>>,
> where
Client<TBackend, TExec, TBl, TRtApi>: ProvideRuntimeApi,
<Client<TBackend, TExec, TBl, TRtApi> as ProvideRuntimeApi>::Api:
sr_api::Metadata<TBl> +
txpool_runtime_api::TaggedTransactionQueue<TBl> +
session::SessionKeys<TBl> +
Benjamin Kampmann
committed
sr_api::ApiExt<TBl, Error = sp_blockchain::Error>,
TBl: BlockT<Hash = <Blake2Hasher as Hasher>::Out>,
TRtApi: ConstructRuntimeApi<TBl, Client<TBackend, TExec, TBl, TRtApi>> + 'static + Send + Sync,
TGen: RuntimeGenesis,
TCSExt: Extension,
TBackend: 'static + client_api::backend::Backend<TBl, Blake2Hasher> + Send,
TExec: 'static + client::CallExecutor<TBl, Blake2Hasher> + Send + Sync + Clone,
TSc: Clone,
TImpQu: 'static + ImportQueue<TBl>,
TNetP: NetworkSpecialization<TBl>,
TExPool: 'static
+ TransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>
+ TransactionPoolMaintainer<Block=TBl, Hash = <TBl as BlockT>::Hash>,
TRpc: rpc::RpcExtension<rpc::Metadata> + Clone,
{
/// Builds the service.
TBl,
Client<TBackend, TExec, TBl, TRtApi>,
TSc,
NetworkStatus<TBl>,
NetworkService<TBl, TNetP, <TBl as BlockT>::Hash>,
offchain::OffchainWorkers<
Client<TBackend, TExec, TBl, TRtApi>,
TBackend::OffchainStorage,
TBl
>,
>, Error> {
let ServiceBuilder {
marker: _,
mut config,
keystore,
select_chain,
import_queue,
finality_proof_request_builder,
finality_proof_provider,
network_protocol,
transaction_pool,
dht_event_tx,
} = self;
session::generate_initial_session_keys(
client.clone(),
&BlockId::Hash(client.info().chain.best_hash),
config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
let (signal, exit) = exit_future::signal();
// List of asynchronous tasks to spawn. We collect them, then spawn them all at once.
let (to_spawn_tx, to_spawn_rx) =
mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send>>();
// A side-channel for essential tasks to communicate shutdown.
let (essential_failed_tx, essential_failed_rx) = mpsc::unbounded();
let import_queue = Box::new(import_queue);
let chain_info = client.info().chain;
let version = config.full_version();
info!("Highest known block at #{}", chain_info.best_number);
telemetry!(
SUBSTRATE_INFO;
"node.start";
"height" => chain_info.best_number.saturated_into::<u64>(),
"best" => ?chain_info.best_hash
);
// make transaction pool available for off-chain runtime calls.
client.execution_extensions()
.register_transaction_pool(Arc::downgrade(&transaction_pool) as _);
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
imports_external_transactions: !config.roles.is_light(),
pool: transaction_pool.clone(),
client: client.clone(),
executor: Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }),
});
let protocol_id = {
let protocol_id_full = match config.chain_spec.protocol_id() {
Some(pid) => pid,
None => {
warn!("Using default protocol ID {:?} because none is configured in the \
chain specs", DEFAULT_PROTOCOL_ID
);
DEFAULT_PROTOCOL_ID
}
}.as_bytes();
network::config::ProtocolId::from(protocol_id_full)
};
let block_announce_validator =
Box::new(consensus_common::block_validation::DefaultBlockAnnounceValidator::new(client.clone()));
let network_params = network::config::Params {
roles: config.roles,
network_config: config.network.clone(),
chain: client.clone(),
finality_proof_provider,
finality_proof_request_builder,
on_demand: on_demand.clone(),
transaction_pool: transaction_pool_adapter.clone() as _,
import_queue,
protocol_id,
specialization: network_protocol,
block_announce_validator,
};
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new()));
let offchain_storage = backend.offchain_storage();
let offchain_workers = match (config.offchain_worker, offchain_storage) {
(true, Some(db)) => {
Some(Arc::new(offchain::OffchainWorkers::new(client.clone(), db)))
},
(true, None) => {
log::warn!("Offchain workers disabled, due to lack of offchain storage support in backend.");
None
_ => None,
};
{
// block notifications
let txpool = Arc::downgrade(&transaction_pool);
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
let to_spawn_tx_ = to_spawn_tx.clone();
let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone();
let is_validator = config.roles.is_authority();
let events = client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |notification| {
let number = *notification.header.number();
let txpool = txpool.upgrade();
if let Some(txpool) = txpool.as_ref() {
let future = txpool.maintain(
&BlockId::hash(notification.hash),
¬ification.retracted,
).map(|_| Ok(())).compat();
let _ = to_spawn_tx_.unbounded_send(Box::new(future));
}
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
if let Some(offchain) = offchain {
let future = offchain.on_block_imported(&number, network_state_info.clone(), is_validator)
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
.map(|()| Ok(()));
let _ = to_spawn_tx_.unbounded_send(Box::new(Compat::new(future)));
}
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
{
// extrinsic notifications
let network = Arc::downgrade(&network);
let transaction_pool_ = transaction_pool.clone();
let events = transaction_pool.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |_| {
if let Some(network) = network.upgrade() {
network.trigger_repropagate();
}
let status = transaction_pool_.status();
telemetry!(SUBSTRATE_INFO; "txpool.import";
"ready" => status.ready,
"future" => status.future
);
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
// Periodically notify the telemetry.
let transaction_pool_ = transaction_pool.clone();
let client_ = client.clone();
let mut sys = System::new();
let self_pid = get_current_pid().ok();
let (state_tx, state_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx);
let tel_task = state_rx.for_each(move |(net_status, _)| {
let info = client_.info();
let best_number = info.chain.best_number.saturated_into::<u64>();
let best_hash = info.chain.best_hash;
let num_peers = net_status.num_connected_peers;
let txpool_status = transaction_pool_.status();
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
let bandwidth_download = net_status.average_download_per_sec;
let bandwidth_upload = net_status.average_upload_per_sec;
let used_state_cache_size = match info.used_state_cache_size {
Some(size) => size,
None => 0,
};
// get cpu usage and memory usage of this process
let (cpu_usage, memory) = if let Some(self_pid) = self_pid {
if sys.refresh_process(self_pid) {
let proc = sys.get_process(self_pid)
.expect("Above refresh_process succeeds, this should be Some(), qed");
(proc.cpu_usage(), proc.memory())
} else { (0.0, 0) }
} else { (0.0, 0) };
telemetry!(
SUBSTRATE_INFO;
"system.interval";
"peers" => num_peers,
"height" => best_number,
"best" => ?best_hash,
"txcount" => txpool_status.ready,
"cpu" => cpu_usage,
"memory" => memory,
"finalized_height" => finalized_number,
"finalized_hash" => ?info.chain.finalized_hash,
"bandwidth_download" => bandwidth_download,
"bandwidth_upload" => bandwidth_upload,
"used_state_cache_size" => used_state_cache_size,
);
"peers".to_owned() => num_peers,
"height".to_owned() => best_number,
"txcount".to_owned() => txpool_status.ready,
"cpu".to_owned() => cpu_usage,
"memory".to_owned() => memory,
"finalized_height".to_owned() => finalized_number,
"bandwidth_download".to_owned() => bandwidth_download,
"bandwidth_upload".to_owned() => bandwidth_upload,
"used_state_cache_size".to_owned() => used_state_cache_size
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
Ok(())
}).select(exit.clone()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task));
// Periodically send the network state to the telemetry.
let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx);
let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| {
telemetry!(
SUBSTRATE_INFO;
"system.network_state";
"state" => network_state,
);
Ok(())
}).select(exit.clone()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task_2));
// RPC
let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded();
let gen_handler = || {
use rpc::{chain, state, author, system};
let system_info = rpc::system::SystemInfo {
chain_name: config.chain_spec.name().into(),
impl_name: config.impl_name.into(),
impl_version: config.impl_version.into(),
properties: config.chain_spec.properties().clone(),
};
let subscriptions = rpc::Subscriptions::new(Arc::new(SpawnTaskHandle {
sender: to_spawn_tx.clone(),
on_exit: exit.clone()