Newer
Older
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID};
use crate::{SpawnTaskHandle, start_rpc_servers, build_network_future, TransactionPoolAdapter};
use crate::status_sinks;
use crate::config::{Configuration, DatabaseConfig};
self,
BlockchainEvents,
backend::RemoteBackend, light::RemoteBlockchain,
use sc_client::Client;
use sc_chain_spec::{RuntimeGenesis, Extension};
use sp_consensus::import_queue::ImportQueue;
use futures03::{
FutureExt as _, TryFutureExt as _,
StreamExt as _, TryStreamExt as _,
};
use sc_keystore::{Store as Keystore};
use log::{info, warn, error};
use sc_network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent};
use sc_network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization};
use sp_core::{Blake2Hasher, H256, Hasher};
use sc_rpc;
use sp_api::ConstructRuntimeApi;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{
Block as BlockT, ProvideRuntimeApi, NumberFor, Header, SaturatedConversion,
use sc_executor::{NativeExecutor, NativeExecutionDispatch};
use std::{
io::{Read, Write, Seek},
marker::PhantomData, sync::Arc, time::SystemTime
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::{TransactionPool, TransactionPoolMaintainer};
Benjamin Kampmann
committed
use sp_blockchain;
/// Aggregator for the components required to build a service.
///
/// # Usage
///
/// Call [`ServiceBuilder::new_full`] or [`ServiceBuilder::new_light`], then call the various
/// `with_` methods to add the required components that you built yourself:
///
/// - [`with_select_chain`](ServiceBuilder::with_select_chain)
/// - [`with_import_queue`](ServiceBuilder::with_import_queue)
/// - [`with_network_protocol`](ServiceBuilder::with_network_protocol)
/// - [`with_finality_proof_provider`](ServiceBuilder::with_finality_proof_provider)
/// - [`with_transaction_pool`](ServiceBuilder::with_transaction_pool)
///
/// After this is done, call [`build`](ServiceBuilder::build) to construct the service.
///
/// The order in which the `with_*` methods are called doesn't matter, as the correct binding of
/// generics is done when you call `build`.
///
pub struct ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
keystore: Arc<RwLock<Keystore>>,
fetcher: Option<TFchr>,
select_chain: Option<TSc>,
finality_proof_request_builder: Option<TFprb>,
finality_proof_provider: Option<TFpp>,
network_protocol: TNetP,
transaction_pool: Arc<TExPool>,
rpc_extensions: TRpc,
remote_backend: Option<Arc<dyn RemoteBlockchain<TBl>>>,
dht_event_tx: Option<mpsc::Sender<DhtEvent>>,
marker: PhantomData<(TBl, TRtApi)>,
}
pub type TFullClient<TBl, TRtApi, TExecDisp> = Client<
TFullBackend<TBl>,
TFullCallExecutor<TBl, TExecDisp>,
TBl,
TRtApi,
>;
/// Full client backend type.
pub type TFullBackend<TBl> = sc_client_db::Backend<TBl>;
/// Full client call executor type.
pub type TFullCallExecutor<TBl, TExecDisp> = sc_client::LocalCallExecutor<
sc_client_db::Backend<TBl>,
NativeExecutor<TExecDisp>,
>;
/// Light client type.
pub type TLightClient<TBl, TRtApi, TExecDisp> = Client<
TLightBackend<TBl>,
TLightCallExecutor<TBl, TExecDisp>,
TBl,
TRtApi,
>;
/// Light client backend type.
pub type TLightBackend<TBl> = sc_client::light::backend::Backend<
sc_client_db::light::LightStorage<TBl>,
Blake2Hasher,
>;
/// Light call executor type.
pub type TLightCallExecutor<TBl, TExecDisp> = sc_client::light::call_executor::GenesisCallExecutor<
sc_client::light::backend::Backend<
sc_client_db::light::LightStorage<TBl>,
sc_client::LocalCallExecutor<
sc_client::light::backend::Backend<
sc_client_db::light::LightStorage<TBl>,
Blake2Hasher
>,
NativeExecutor<TExecDisp>
>,
>;
impl<TCfg, TGen, TCSExt> ServiceBuilder<(), (), TCfg, TGen, TCSExt, (), (), (), (), (), (), (), (), (), ()>
where TGen: RuntimeGenesis, TCSExt: Extension {
/// Start the service builder with a configuration.
pub fn new_full<TBl: BlockT<Hash=H256>, TRtApi, TExecDisp: NativeExecutionDispatch>(
) -> Result<ServiceBuilder<
TBl,
TRtApi,
TCfg,
TGen,
TFullClient<TBl, TRtApi, TExecDisp>,
Arc<OnDemand<TBl>>,
(),
(),
BoxFinalityProofRequestBuilder<TBl>,
Bastian Köcher
committed
Arc<dyn FinalityProofProvider<TBl>>,
Benjamin Kampmann
committed
let keystore = Keystore::open(
config.keystore_path.clone().ok_or("No basepath configured")?,
config.keystore_password.clone()
)?;
let executor = NativeExecutor::<TExecDisp>::new(
config.wasm_method,
config.default_heap_pages,
);
let fork_blocks = config.chain_spec
.extensions()
.get::<sc_client::ForkBlocks<TBl>>()
let (client, backend) = {
let db_config = sc_client_db::DatabaseSettings {
state_cache_size: config.state_cache_size,
state_cache_child_ratio:
config.state_cache_child_ratio.map(|v| (v, 100)),
pruning: config.pruning.clone(),
source: match &config.database {
DatabaseConfig::Path { path, cache_size } =>
sc_client_db::DatabaseSettingsSrc::Path {
path: path.clone(),
cache_size: cache_size.clone().map(|u| u as usize),
},
DatabaseConfig::Custom(db) =>
sc_client_db::DatabaseSettingsSrc::Custom(db.clone()),
},
};
let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
config.execution_strategies.clone(),
Some(keystore.clone()),
);
sc_client_db::new_client(
db_config,
executor,
&config.chain_spec,
fork_blocks,
)?
};
Ok(ServiceBuilder {
config,
client,
keystore,
fetcher: None,
select_chain: None,
import_queue: (),
finality_proof_request_builder: None,
finality_proof_provider: None,
network_protocol: (),
transaction_pool: Arc::new(()),
rpc_extensions: Default::default(),
dht_event_tx: None,
marker: PhantomData,
})
}
/// Start the service builder with a configuration.
pub fn new_light<TBl: BlockT<Hash=H256>, TRtApi, TExecDisp: NativeExecutionDispatch + 'static>(
) -> Result<ServiceBuilder<
TBl,
TRtApi,
TCfg,
TGen,
TLightClient<TBl, TRtApi, TExecDisp>,
Arc<OnDemand<TBl>>,
(),
(),
BoxFinalityProofRequestBuilder<TBl>,
Bastian Köcher
committed
Arc<dyn FinalityProofProvider<TBl>>,
Benjamin Kampmann
committed
let keystore = Keystore::open(
config.keystore_path.clone().ok_or("No basepath configured")?,
config.keystore_password.clone()
)?;
let executor = NativeExecutor::<TExecDisp>::new(
config.wasm_method,
config.default_heap_pages,
);
let db_storage = {
let db_settings = sc_client_db::DatabaseSettings {
state_cache_size: config.state_cache_size,
state_cache_child_ratio:
config.state_cache_child_ratio.map(|v| (v, 100)),
pruning: config.pruning.clone(),
source: match &config.database {
DatabaseConfig::Path { path, cache_size } =>
sc_client_db::DatabaseSettingsSrc::Path {
path: path.clone(),
cache_size: cache_size.clone().map(|u| u as usize),
},
DatabaseConfig::Custom(db) =>
sc_client_db::DatabaseSettingsSrc::Custom(db.clone()),
},
};
sc_client_db::light::LightStorage::new(db_settings)?
let light_blockchain = sc_client::light::new_light_blockchain(db_storage);
let fetch_checker = Arc::new(sc_client::light::new_fetch_checker(light_blockchain.clone(), executor.clone()));
let fetcher = Arc::new(sc_network::OnDemand::new(fetch_checker));
let backend = sc_client::light::new_light_backend(light_blockchain);
let remote_blockchain = backend.remote_blockchain();
let client = Arc::new(sc_client::light::new_light(
backend.clone(),
&config.chain_spec,
executor,
)?);
fetcher: Some(fetcher.clone()),
select_chain: None,
import_queue: (),
finality_proof_request_builder: None,
finality_proof_provider: None,
network_protocol: (),
transaction_pool: Arc::new(()),
rpc_extensions: Default::default(),
remote_backend: Some(remote_blockchain),
dht_event_tx: None,
marker: PhantomData,
})
}
}
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, Backend>
ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
/// Returns a reference to the client that was stored in this builder.
pub fn client(&self) -> &Arc<TCl> {
&self.client
}
/// Returns a reference to the backend that was used in this builder.
pub fn backend(&self) -> &Arc<Backend> {
&self.backend
}
/// Returns a reference to the select-chain that was stored in this builder.
pub fn select_chain(&self) -> Option<&TSc> {
self.select_chain.as_ref()
}
/// Defines which head-of-chain strategy to use.
pub fn with_opt_select_chain<USc>(
select_chain_builder: impl FnOnce(
&Configuration<TCfg, TGen, TCSExt>, &Arc<Backend>
) -> Result<Option<USc>, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, USc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error> {
let select_chain = select_chain_builder(&self.config, &self.backend)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines which head-of-chain strategy to use.
pub fn with_select_chain<USc>(
self,
builder: impl FnOnce(&Configuration<TCfg, TGen, TCSExt>, &Arc<Backend>) -> Result<USc, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, USc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error> {
self.with_opt_select_chain(|cfg, b| builder(cfg, b).map(Option::Some))
}
/// Defines which import queue to use.
pub fn with_import_queue<UImpQu>(
builder: impl FnOnce(&Configuration<TCfg, TGen, TCSExt>, Arc<TCl>, Option<TSc>, Arc<TExPool>)
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, UImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error>
where TSc: Clone {
let import_queue = builder(
self.client.clone(),
self.select_chain.clone(),
self.transaction_pool.clone()
)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines which network specialization protocol to use.
pub fn with_network_protocol<UNetP>(
self,
network_protocol_builder: impl FnOnce(&Configuration<TCfg, TGen, TCSExt>) -> Result<UNetP, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
UNetP, TExPool, TRpc, Backend>, Error> {
let network_protocol = network_protocol_builder(&self.config)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines which strategy to use for providing finality proofs.
pub fn with_opt_finality_proof_provider(
self,
builder: impl FnOnce(Arc<TCl>, Arc<Backend>) -> Result<Option<Arc<dyn FinalityProofProvider<TBl>>>, Error>
) -> Result<ServiceBuilder<
TBl,
TRtApi,
TCfg,
TGen,
TCl,
TFchr,
TSc,
TImpQu,
TFprb,
Arc<dyn FinalityProofProvider<TBl>>,
let finality_proof_provider = builder(self.client.clone(), self.backend.clone())?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines which strategy to use for providing finality proofs.
pub fn with_finality_proof_provider(
self,
build: impl FnOnce(Arc<TCl>, Arc<Backend>) -> Result<Arc<dyn FinalityProofProvider<TBl>>, Error>
) -> Result<ServiceBuilder<
TBl,
TRtApi,
TCfg,
TGen,
TCl,
TFchr,
TSc,
TImpQu,
TFprb,
Arc<dyn FinalityProofProvider<TBl>>,
self.with_opt_finality_proof_provider(|client, backend| build(client, backend).map(Option::Some))
}
/// Defines which import queue to use.
pub fn with_import_queue_and_opt_fprb<UImpQu, UFprb>(
Svyatoslav Nikolsky
committed
builder: impl FnOnce(
Svyatoslav Nikolsky
committed
Arc<TCl>,
Arc<Backend>,
Option<TFchr>,
Option<TSc>,
Arc<TExPool>,
) -> Result<(UImpQu, Option<UFprb>), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, UImpQu, UFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error>
Svyatoslav Nikolsky
committed
where TSc: Clone, TFchr: Clone {
Svyatoslav Nikolsky
committed
self.fetcher.clone(),
self.select_chain.clone(),
self.transaction_pool.clone()
)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue,
finality_proof_request_builder: fprb,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines which import queue to use.
pub fn with_import_queue_and_fprb<UImpQu, UFprb>(
self,
Svyatoslav Nikolsky
committed
builder: impl FnOnce(
Svyatoslav Nikolsky
committed
Arc<TCl>,
Arc<Backend>,
Option<TFchr>,
Option<TSc>,
Arc<TExPool>,
) -> Result<(UImpQu, UFprb), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, UImpQu, UFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error>
Svyatoslav Nikolsky
committed
where TSc: Clone, TFchr: Clone {
self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx|
builder(cfg, cl, b, f, sc, tx)
.map(|(q, f)| (q, Some(f)))
)
}
/// Defines which transaction pool to use.
pub fn with_transaction_pool<UExPool>(
self,
transaction_pool_builder: impl FnOnce(
sc_transaction_pool::txpool::Options,
Arc<TCl>,
Option<TFchr>,
) -> Result<UExPool, Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, UExPool, TRpc, Backend>, Error>
where TSc: Clone, TFchr: Clone {
let transaction_pool = transaction_pool_builder(
self.config.transaction_pool.clone(),
self.client.clone(),
self.fetcher.clone(),
)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: Arc::new(transaction_pool),
rpc_extensions: self.rpc_extensions,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Defines the RPC extensions to use.
pub fn with_rpc_extensions<URpc>(
self,
rpc_ext_builder: impl FnOnce(
Arc<TCl>,
Arc<TExPool>,
Arc<Backend>,
Option<TFchr>,
Option<Arc<dyn RemoteBlockchain<TBl>>>,
) -> Result<URpc, Error>,
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, URpc, Backend>, Error>
where TSc: Clone, TFchr: Clone {
let rpc_extensions = rpc_ext_builder(
self.client.clone(),
self.transaction_pool.clone(),
self.backend.clone(),
self.fetcher.clone(),
self.remote_backend.clone(),
)?;
Ok(ServiceBuilder {
config: self.config,
client: self.client,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions,
dht_event_tx: self.dht_event_tx,
/// Adds a dht event sender to builder to be used by the network to send dht events to the authority discovery
/// module.
pub fn with_dht_event_tx(
self,
dht_event_tx: mpsc::Sender<DhtEvent>,
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCSExt, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, Backend>, Error> {
Ok(ServiceBuilder {
config: self.config,
client: self.client,
backend: self.backend,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
remote_backend: self.remote_backend,
dht_event_tx: Some(dht_event_tx),
marker: self.marker,
})
/// Implemented on `ServiceBuilder`. Allows running block commands, such as import/export/validate
pub trait ServiceBuilderCommand {
/// Block type this API operates on.
type Block: BlockT;
/// Starts the process of importing blocks.
fn import_blocks(
self,
input: impl Read + Seek + Send + 'static,
) -> Box<dyn Future<Item = (), Error = Error> + Send>;
/// Performs the blocks export.
fn export_blocks(
self,
output: impl Write + 'static,
from: NumberFor<Self::Block>,
to: Option<NumberFor<Self::Block>>,
json: bool
) -> Box<dyn Future<Item = (), Error = Error>>;
/// Performs a revert of `blocks` blocks.
fn revert_chain(
&self,
blocks: NumberFor<Self::Block>
) -> Result<(), Error>;
/// Re-validate known block.
fn check_block(
block: BlockId<Self::Block>
) -> Box<dyn Future<Item = (), Error = Error> + Send>;
impl<TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TSc, TImpQu, TNetP, TExPool, TRpc>
ServiceBuilder<
TBl,
TRtApi,
TCfg,
TGen,
Client<TBackend, TExec, TBl, TRtApi>,
Arc<OnDemand<TBl>>,
TSc,
TImpQu,
BoxFinalityProofRequestBuilder<TBl>,
Arc<dyn FinalityProofProvider<TBl>>,
> where
Client<TBackend, TExec, TBl, TRtApi>: ProvideRuntimeApi,
<Client<TBackend, TExec, TBl, TRtApi> as ProvideRuntimeApi>::Api:
sc_offchain::OffchainWorkerApi<TBl> +
sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl> +
sp_session::SessionKeys<TBl> +
sp_api::ApiExt<TBl, Error = sp_blockchain::Error>,
TBl: BlockT<Hash = <Blake2Hasher as Hasher>::Out>,
TRtApi: ConstructRuntimeApi<TBl, Client<TBackend, TExec, TBl, TRtApi>> + 'static + Send + Sync,
TGen: RuntimeGenesis,
TCSExt: Extension,
TBackend: 'static + sc_client_api::backend::Backend<TBl, Blake2Hasher> + Send,
TExec: 'static + sc_client::CallExecutor<TBl, Blake2Hasher> + Send + Sync + Clone,
TSc: Clone,
TImpQu: 'static + ImportQueue<TBl>,
TNetP: NetworkSpecialization<TBl>,
TExPool: 'static
+ TransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash>
+ TransactionPoolMaintainer<Block=TBl, Hash = <TBl as BlockT>::Hash>,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata> + Clone,
TBl,
Client<TBackend, TExec, TBl, TRtApi>,
TSc,
NetworkStatus<TBl>,
NetworkService<TBl, TNetP, <TBl as BlockT>::Hash>,
sc_offchain::OffchainWorkers<
Client<TBackend, TExec, TBl, TRtApi>,
TBackend::OffchainStorage,
TBl
>,
>, Error> {
let ServiceBuilder {
marker: _,
mut config,
keystore,
select_chain,
import_queue,
finality_proof_request_builder,
finality_proof_provider,
network_protocol,
transaction_pool,
dht_event_tx,
sp_session::generate_initial_session_keys(
&BlockId::Hash(client.info().chain.best_hash),
config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
let (signal, exit) = exit_future::signal();
// List of asynchronous tasks to spawn. We collect them, then spawn them all at once.
let (to_spawn_tx, to_spawn_rx) =
mpsc::unbounded::<Box<dyn Future<Item = (), Error = ()> + Send>>();
// A side-channel for essential tasks to communicate shutdown.
let (essential_failed_tx, essential_failed_rx) = mpsc::unbounded();
let import_queue = Box::new(import_queue);
let chain_info = client.info().chain;
let version = config.full_version();
info!("Highest known block at #{}", chain_info.best_number);
telemetry!(
SUBSTRATE_INFO;
"node.start";
"height" => chain_info.best_number.saturated_into::<u64>(),
"best" => ?chain_info.best_hash
);
// make transaction pool available for off-chain runtime calls.
client.execution_extensions()
.register_transaction_pool(Arc::downgrade(&transaction_pool) as _);
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
imports_external_transactions: !config.roles.is_light(),
pool: transaction_pool.clone(),
client: client.clone(),
executor: Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }),
});
let protocol_id = {
let protocol_id_full = match config.chain_spec.protocol_id() {
Some(pid) => pid,
None => {
warn!("Using default protocol ID {:?} because none is configured in the \
chain specs", DEFAULT_PROTOCOL_ID
);
DEFAULT_PROTOCOL_ID
}
}.as_bytes();
sc_network::config::ProtocolId::from(protocol_id_full)
};
let block_announce_validator =
Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator::new(client.clone()));
let network_params = sc_network::config::Params {
roles: config.roles,
network_config: config.network.clone(),
chain: client.clone(),
finality_proof_provider,
finality_proof_request_builder,
on_demand: on_demand.clone(),
transaction_pool: transaction_pool_adapter.clone() as _,
import_queue,
protocol_id,
specialization: network_protocol,
block_announce_validator,
};
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new()));
let offchain_storage = backend.offchain_storage();
let offchain_workers = match (config.offchain_worker, offchain_storage) {
(true, Some(db)) => {
Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone(), db)))
},
(true, None) => {
log::warn!("Offchain workers disabled, due to lack of offchain storage support in backend.");
None
_ => None,
};
{
// block notifications
let txpool = Arc::downgrade(&transaction_pool);
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
let to_spawn_tx_ = to_spawn_tx.clone();
let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone();
let is_validator = config.roles.is_authority();
let events = client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |notification| {
let number = *notification.header.number();
let txpool = txpool.upgrade();
if let Some(txpool) = txpool.as_ref() {
let future = txpool.maintain(
&BlockId::hash(notification.hash),
¬ification.retracted,
).map(|_| Ok(())).compat();
let _ = to_spawn_tx_.unbounded_send(Box::new(future));
}
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
if let Some(offchain) = offchain {
let future = offchain.on_block_imported(&number, network_state_info.clone(), is_validator)
.map(|()| Ok(()));
let _ = to_spawn_tx_.unbounded_send(Box::new(Compat::new(future)));
}
Ok(())
})
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
{
// extrinsic notifications
let network = Arc::downgrade(&network);
let transaction_pool_ = transaction_pool.clone();
let events = transaction_pool.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |_| {
if let Some(network) = network.upgrade() {
network.trigger_repropagate();
}
let status = transaction_pool_.status();
telemetry!(SUBSTRATE_INFO; "txpool.import";
"ready" => status.ready,
"future" => status.future
);
Ok(())
})
.select(exit.clone().map(Ok).compat())
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
// Periodically notify the telemetry.
let transaction_pool_ = transaction_pool.clone();
let client_ = client.clone();
let mut sys = System::new();
let self_pid = get_current_pid().ok();
let (state_tx, state_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx);
let tel_task = state_rx.for_each(move |(net_status, _)| {
let info = client_.info();
let best_number = info.chain.best_number.saturated_into::<u64>();
let best_hash = info.chain.best_hash;
let num_peers = net_status.num_connected_peers;
let txpool_status = transaction_pool_.status();
let finalized_number: u64 = info.chain.finalized_number.saturated_into::<u64>();
let bandwidth_download = net_status.average_download_per_sec;
let bandwidth_upload = net_status.average_upload_per_sec;
let used_state_cache_size = match info.used_state_cache_size {
Some(size) => size,
None => 0,
};
// get cpu usage and memory usage of this process
let (cpu_usage, memory) = if let Some(self_pid) = self_pid {
if sys.refresh_process(self_pid) {
let proc = sys.get_process(self_pid)
.expect("Above refresh_process succeeds, this should be Some(), qed");
(proc.cpu_usage(), proc.memory())
} else { (0.0, 0) }
} else { (0.0, 0) };
telemetry!(
SUBSTRATE_INFO;
"system.interval";
"peers" => num_peers,
"height" => best_number,
"best" => ?best_hash,
"txcount" => txpool_status.ready,
"cpu" => cpu_usage,
"memory" => memory,
"finalized_height" => finalized_number,
"finalized_hash" => ?info.chain.finalized_hash,
"bandwidth_download" => bandwidth_download,
"bandwidth_upload" => bandwidth_upload,
"used_state_cache_size" => used_state_cache_size,
);
let _ = record_metrics!(
"peers" => num_peers,
"height" => best_number,
"txcount" => txpool_status.ready,
"cpu" => cpu_usage,
"memory" => memory,
"finalized_height" => finalized_number,
"bandwidth_download" => bandwidth_download,
"bandwidth_upload" => bandwidth_upload,
"used_state_cache_size" => used_state_cache_size,
}).select(exit.clone().map(Ok).compat()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task));
// Periodically send the network state to the telemetry.
let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx);
let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| {
telemetry!(
SUBSTRATE_INFO;
"system.network_state";
"state" => network_state,
);
Ok(())
}).select(exit.clone().map(Ok).compat()).then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(tel_task_2));
// RPC
let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded();
let gen_handler = || {
use sc_rpc::{chain, state, author, system};
let system_info = sc_rpc::system::SystemInfo {
chain_name: config.chain_spec.name().into(),
impl_name: config.impl_name.into(),
impl_version: config.impl_version.into(),
properties: config.chain_spec.properties().clone(),
};
let subscriptions = sc_rpc::Subscriptions::new(Arc::new(SpawnTaskHandle {
sender: to_spawn_tx.clone(),
on_exit: exit.clone()