// Copyright 2017-2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . use crate::{NewService, NetworkStatus, NetworkState, error::{self, Error}, DEFAULT_PROTOCOL_ID}; use crate::{SpawnTaskHandle, start_rpc_servers, build_network_future, TransactionPoolAdapter}; use crate::TaskExecutor; use crate::config::Configuration; use client::{BlockchainEvents, Client, runtime_api}; use codec::{Decode, Encode, IoReader}; use consensus_common::import_queue::ImportQueue; use futures::{prelude::*, sync::mpsc}; use futures03::{FutureExt as _, compat::Compat, StreamExt as _, TryStreamExt as _}; use keystore::{Store as Keystore, KeyStorePtr}; use log::{info, warn}; use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo}; use network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization}; use parking_lot::{Mutex, RwLock}; use primitives::{Blake2Hasher, H256, Hasher}; use rpc::{self, system::SystemInfo}; use sr_primitives::{BuildStorage, generic::BlockId}; use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi, NumberFor, One, Zero, Header, SaturatedConversion}; use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; use serde::{Serialize, de::DeserializeOwned}; use std::{io::{Read, Write, Seek}, marker::PhantomData, sync::Arc, sync::atomic::AtomicBool}; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use tel::{telemetry, SUBSTRATE_INFO}; use transaction_pool::txpool::{self, ChainApi, Pool as TransactionPool}; /// Aggregator for the components required to build a service. /// /// # Usage /// /// Call [`ServiceBuilder::new_full`] or [`ServiceBuilder::new_light`], then call the various /// `with_` methods to add the required components that you built yourself: /// /// - [`with_select_chain`](ServiceBuilder::with_select_chain) /// - [`with_import_queue`](ServiceBuilder::with_import_queue) /// - [`with_network_protocol`](ServiceBuilder::with_network_protocol) /// - [`with_finality_proof_provider`](ServiceBuilder::with_finality_proof_provider) /// - [`with_transaction_pool`](ServiceBuilder::with_transaction_pool) /// /// After this is done, call [`build`](ServiceBuilder::build) to construct the service. /// /// The order in which the `with_*` methods are called doesn't matter, as the correct binding of /// generics is done when you call `build`. /// pub struct ServiceBuilder { config: Configuration, client: Arc, keystore: Arc>, fetcher: Option, select_chain: Option, import_queue: TImpQu, finality_proof_request_builder: Option, finality_proof_provider: Option, network_protocol: TNetP, transaction_pool: Arc, rpc_extensions: TRpc, marker: PhantomData<(TBl, TRtApi)>, } impl ServiceBuilder<(), (), TCfg, TGen, (), (), (), (), (), (), (), (), ()> where TGen: Serialize + DeserializeOwned + BuildStorage { /// Start the service builder with a configuration. pub fn new_full, TRtApi, TExecDisp: NativeExecutionDispatch>( config: Configuration ) -> Result, client::LocalCallExecutor, NativeExecutor>, TBl, TRtApi >, Arc>, (), (), BoxFinalityProofRequestBuilder, (), (), (), () >, Error> { let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?; let db_settings = client_db::DatabaseSettings { cache_size: None, state_cache_size: config.state_cache_size, state_cache_child_ratio: config.state_cache_child_ratio.map(|v| (v, 100)), path: config.database_path.clone(), pruning: config.pruning.clone(), }; let executor = NativeExecutor::::new(config.default_heap_pages); let client = Arc::new(client_db::new_client( db_settings, executor, &config.chain_spec, config.execution_strategies.clone(), Some(keystore.clone()), )?); Ok(ServiceBuilder { config, client, keystore, fetcher: None, select_chain: None, import_queue: (), finality_proof_request_builder: None, finality_proof_provider: None, network_protocol: (), transaction_pool: Arc::new(()), rpc_extensions: Default::default(), marker: PhantomData, }) } /// Start the service builder with a configuration. pub fn new_light, TRtApi, TExecDisp: NativeExecutionDispatch + 'static>( config: Configuration ) -> Result, network::OnDemand, Blake2Hasher>, client::light::call_executor::RemoteOrLocalCallExecutor< TBl, client::light::backend::Backend< client_db::light::LightStorage, network::OnDemand, Blake2Hasher >, client::light::call_executor::RemoteCallExecutor< client::light::blockchain::Blockchain< client_db::light::LightStorage, network::OnDemand >, network::OnDemand, >, client::LocalCallExecutor< client::light::backend::Backend< client_db::light::LightStorage, network::OnDemand, Blake2Hasher >, NativeExecutor > >, TBl, TRtApi >, Arc>, (), (), BoxFinalityProofRequestBuilder, (), (), (), () >, Error> { let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?; let db_settings = client_db::DatabaseSettings { cache_size: config.database_cache_size.map(|u| u as usize), state_cache_size: config.state_cache_size, state_cache_child_ratio: config.state_cache_child_ratio.map(|v| (v, 100)), path: config.database_path.clone(), pruning: config.pruning.clone(), }; let executor = NativeExecutor::::new(config.default_heap_pages); let db_storage = client_db::light::LightStorage::new(db_settings)?; let light_blockchain = client::light::new_light_blockchain(db_storage); let fetch_checker = Arc::new(client::light::new_fetch_checker(light_blockchain.clone(), executor.clone())); let fetcher = Arc::new(network::OnDemand::new(fetch_checker)); let client_backend = client::light::new_light_backend(light_blockchain, fetcher.clone()); let client = client::light::new_light(client_backend, fetcher.clone(), &config.chain_spec, executor)?; Ok(ServiceBuilder { config, client: Arc::new(client), keystore, fetcher: Some(fetcher), select_chain: None, import_queue: (), finality_proof_request_builder: None, finality_proof_provider: None, network_protocol: (), transaction_pool: Arc::new(()), rpc_extensions: Default::default(), marker: PhantomData, }) } } impl ServiceBuilder { /// Returns a reference to the client that was stored in this builder. pub fn client(&self) -> &Arc { &self.client } /// Returns a reference to the select-chain that was stored in this builder. pub fn select_chain(&self) -> Option<&TSc> { self.select_chain.as_ref() } /// Defines which head-of-chain strategy to use. pub fn with_opt_select_chain( mut self, select_chain_builder: impl FnOnce(&mut Configuration, Arc) -> Result, Error> ) -> Result, Error> { let select_chain = select_chain_builder(&mut self.config, self.client.clone())?; Ok(ServiceBuilder { config: self.config, client: self.client, keystore: self.keystore, fetcher: self.fetcher, select_chain, import_queue: self.import_queue, finality_proof_request_builder: self.finality_proof_request_builder, finality_proof_provider: self.finality_proof_provider, network_protocol: self.network_protocol, transaction_pool: self.transaction_pool, rpc_extensions: self.rpc_extensions, marker: self.marker, }) } /// Defines which head-of-chain strategy to use. pub fn with_select_chain( self, builder: impl FnOnce(&mut Configuration, Arc) -> Result ) -> Result, Error> { self.with_opt_select_chain(|cfg, cl| builder(cfg, cl).map(Option::Some)) } /// Defines which import queue to use. pub fn with_import_queue( mut self, builder: impl FnOnce(&mut Configuration, Arc, Option, Arc) -> Result ) -> Result, Error> where TSc: Clone { let import_queue = builder( &mut self.config, self.client.clone(), self.select_chain.clone(), self.transaction_pool.clone() )?; Ok(ServiceBuilder { config: self.config, client: self.client, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, import_queue, finality_proof_request_builder: self.finality_proof_request_builder, finality_proof_provider: self.finality_proof_provider, network_protocol: self.network_protocol, transaction_pool: self.transaction_pool, rpc_extensions: self.rpc_extensions, marker: self.marker, }) } /// Defines which network specialization protocol to use. pub fn with_network_protocol( self, network_protocol_builder: impl FnOnce(&Configuration) -> Result ) -> Result, Error> { let network_protocol = network_protocol_builder(&self.config)?; Ok(ServiceBuilder { config: self.config, client: self.client, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, import_queue: self.import_queue, finality_proof_request_builder: self.finality_proof_request_builder, finality_proof_provider: self.finality_proof_provider, network_protocol, transaction_pool: self.transaction_pool, rpc_extensions: self.rpc_extensions, marker: self.marker, }) } /// Defines which strategy to use for providing finality proofs. pub fn with_opt_finality_proof_provider( self, builder: impl FnOnce(Arc) -> Result>>, Error> ) -> Result>, TNetP, TExPool, TRpc >, Error> { let finality_proof_provider = builder(self.client.clone())?; Ok(ServiceBuilder { config: self.config, client: self.client, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, import_queue: self.import_queue, finality_proof_request_builder: self.finality_proof_request_builder, finality_proof_provider, network_protocol: self.network_protocol, transaction_pool: self.transaction_pool, rpc_extensions: self.rpc_extensions, marker: self.marker, }) } /// Defines which strategy to use for providing finality proofs. pub fn with_finality_proof_provider( self, build: impl FnOnce(Arc) -> Result>, Error> ) -> Result>, TNetP, TExPool, TRpc >, Error> { self.with_opt_finality_proof_provider(|client| build(client).map(Option::Some)) } /// Defines which import queue to use. pub fn with_import_queue_and_opt_fprb( mut self, builder: impl FnOnce(&mut Configuration, Arc, Option, Arc) -> Result<(UImpQu, Option), Error> ) -> Result, Error> where TSc: Clone { let (import_queue, fprb) = builder( &mut self.config, self.client.clone(), self.select_chain.clone(), self.transaction_pool.clone() )?; Ok(ServiceBuilder { config: self.config, client: self.client, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, import_queue, finality_proof_request_builder: fprb, finality_proof_provider: self.finality_proof_provider, network_protocol: self.network_protocol, transaction_pool: self.transaction_pool, rpc_extensions: self.rpc_extensions, marker: self.marker, }) } /// Defines which import queue to use. pub fn with_import_queue_and_fprb( self, builder: impl FnOnce(&mut Configuration, Arc, Option, Arc) -> Result<(UImpQu, UFprb), Error> ) -> Result, Error> where TSc: Clone { self.with_import_queue_and_opt_fprb(|cfg, cl, sc, tx| builder(cfg, cl, sc, tx).map(|(q, f)| (q, Some(f)))) } /// Defines which transaction pool to use. pub fn with_transaction_pool( self, transaction_pool_builder: impl FnOnce(transaction_pool::txpool::Options, Arc) -> Result ) -> Result, Error> { let transaction_pool = transaction_pool_builder(self.config.transaction_pool.clone(), self.client.clone())?; Ok(ServiceBuilder { config: self.config, client: self.client, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, import_queue: self.import_queue, finality_proof_request_builder: self.finality_proof_request_builder, finality_proof_provider: self.finality_proof_provider, network_protocol: self.network_protocol, transaction_pool: Arc::new(transaction_pool), rpc_extensions: self.rpc_extensions, marker: self.marker, }) } /// Defines the RPC extensions to use. pub fn with_rpc_extensions( self, rpc_ext_builder: impl FnOnce(Arc, Arc) -> URpc ) -> Result, Error> { let rpc_extensions = rpc_ext_builder(self.client.clone(), self.transaction_pool.clone()); Ok(ServiceBuilder { config: self.config, client: self.client, keystore: self.keystore, fetcher: self.fetcher, select_chain: self.select_chain, import_queue: self.import_queue, finality_proof_request_builder: self.finality_proof_request_builder, finality_proof_provider: self.finality_proof_provider, network_protocol: self.network_protocol, transaction_pool: self.transaction_pool, rpc_extensions, marker: self.marker, }) } } /// Implemented on `ServiceBuilder`. Allows importing blocks once you have given all the required /// components to the builder. pub trait ServiceBuilderImport { /// Starts the process of importing blocks. fn import_blocks( self, exit: impl Future + Send + 'static, input: impl Read + Seek, ) -> Result + Send>, Error>; } /// Implemented on `ServiceBuilder`. Allows exporting blocks once you have given all the required /// components to the builder. pub trait ServiceBuilderExport { /// Type of block of the builder. type Block: BlockT; /// Performs the blocks export. fn export_blocks( &self, exit: impl Future + Send + 'static, output: impl Write, from: NumberFor, to: Option>, json: bool ) -> Result<(), Error>; } /// Implemented on `ServiceBuilder`. Allows reverting the chain once you have given all the /// required components to the builder. pub trait ServiceBuilderRevert { /// Type of block of the builder. type Block: BlockT; /// Performs a revert of `blocks` bocks. fn revert_chain( &self, blocks: NumberFor ) -> Result<(), Error>; } impl ServiceBuilderImport for ServiceBuilder, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc> where TBl: BlockT::Out>, TBackend: 'static + client::backend::Backend + Send, TExec: 'static + client::CallExecutor + Send + Sync + Clone, TImpQu: 'static + ImportQueue, TRtApi: 'static + Send + Sync, { fn import_blocks( self, exit: impl Future + Send + 'static, input: impl Read + Seek, ) -> Result + Send>, Error> { let client = self.client; let mut queue = self.import_queue; import_blocks!(TBl, client, queue, exit, input) .map(|f| Box::new(f) as Box<_>) } } impl ServiceBuilderExport for ServiceBuilder, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc> where TBl: BlockT::Out>, TBackend: 'static + client::backend::Backend + Send, TExec: 'static + client::CallExecutor + Send + Sync + Clone { type Block = TBl; fn export_blocks( &self, exit: impl Future + Send + 'static, mut output: impl Write, from: NumberFor, to: Option>, json: bool ) -> Result<(), Error> { let client = &self.client; export_blocks!(client, exit, output, from, to, json) } } impl ServiceBuilderRevert for ServiceBuilder, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc> where TBl: BlockT::Out>, TBackend: 'static + client::backend::Backend + Send, TExec: 'static + client::CallExecutor + Send + Sync + Clone { type Block = TBl; fn revert_chain( &self, blocks: NumberFor ) -> Result<(), Error> { let client = &self.client; revert_chain!(client, blocks) } } impl ServiceBuilder< TBl, TRtApi, TCfg, TGen, Client, Arc>, TSc, TImpQu, BoxFinalityProofRequestBuilder, Arc>, TNetP, TransactionPool, TRpc > where Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::Metadata + offchain::OffchainWorkerApi + runtime_api::TaggedTransactionQueue + session::SessionKeys, TBl: BlockT::Out>, TRtApi: 'static + Send + Sync, TCfg: Default, TGen: Serialize + DeserializeOwned + BuildStorage, TBackend: 'static + client::backend::Backend + Send, TExec: 'static + client::CallExecutor + Send + Sync + Clone, TSc: Clone, TImpQu: 'static + ImportQueue, TNetP: NetworkSpecialization, TExPoolApi: 'static + ChainApi::Hash>, TRpc: rpc::RpcExtension + Clone, { /// Builds the service. pub fn build(self) -> Result, TBl, Client, TSc, NetworkStatus, NetworkService::Hash>, TransactionPool, offchain::OffchainWorkers< Client, TBackend::OffchainStorage, TBl >, >, Error> { let mut config = self.config; session::generate_initial_session_keys( self.client.clone(), config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default() )?; let ( client, fetcher, keystore, select_chain, import_queue, finality_proof_request_builder, finality_proof_provider, network_protocol, transaction_pool, rpc_extensions ) = ( self.client, self.fetcher, self.keystore, self.select_chain, self.import_queue, self.finality_proof_request_builder, self.finality_proof_provider, self.network_protocol, self.transaction_pool, self.rpc_extensions ); new_impl!( TBl, config, move |_| -> Result<_, Error> { Ok(( client, fetcher, keystore, select_chain, import_queue, finality_proof_request_builder, finality_proof_provider, network_protocol, transaction_pool, rpc_extensions )) }, |h, c, tx| maintain_transaction_pool(h, c, tx), |n, o, p, ns, v| offchain_workers(n, o, p, ns, v), |c, ssb, si, te, tp, ext, ks| start_rpc(c, ssb, si, te, tp, ext, ks), ) } } pub(crate) fn start_rpc( client: Arc>, system_send_back: futures03::channel::mpsc::UnboundedSender>, rpc_system_info: SystemInfo, task_executor: TaskExecutor, transaction_pool: Arc>, rpc_extensions: impl rpc::RpcExtension, keystore: KeyStorePtr, ) -> rpc_servers::RpcHandler where Block: BlockT::Out>, Backend: client::backend::Backend + 'static, Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::Metadata + session::SessionKeys, Api: Send + Sync + 'static, Executor: client::CallExecutor + Send + Sync + Clone + 'static, PoolApi: txpool::ChainApi + 'static { use rpc::{chain, state, author, system}; let subscriptions = rpc::Subscriptions::new(task_executor.clone()); let chain = chain::Chain::new(client.clone(), subscriptions.clone()); let state = state::State::new(client.clone(), subscriptions.clone()); let author = rpc::author::Author::new( client, transaction_pool, subscriptions, keystore, ); let system = system::System::new(rpc_system_info, system_send_back); rpc_servers::rpc_handler(( state::StateApi::to_delegate(state), chain::ChainApi::to_delegate(chain), author::AuthorApi::to_delegate(author), system::SystemApi::to_delegate(system), rpc_extensions, )) } pub(crate) fn maintain_transaction_pool( id: &BlockId, client: &Client, transaction_pool: &TransactionPool, ) -> error::Result<()> where Block: BlockT::Out>, Backend: client::backend::Backend, Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: runtime_api::TaggedTransactionQueue, Executor: client::CallExecutor, PoolApi: txpool::ChainApi, { // Avoid calling into runtime if there is nothing to prune from the pool anyway. if transaction_pool.status().is_empty() { return Ok(()) } if let Some(block) = client.block(id)? { let parent_id = BlockId::hash(*block.block.header().parent_hash()); let extrinsics = block.block.extrinsics(); transaction_pool.prune(id, &parent_id, extrinsics).map_err(|e| format!("{:?}", e))?; } Ok(()) } pub(crate) fn offchain_workers( number: &NumberFor, offchain: &offchain::OffchainWorkers< Client, >::OffchainStorage, Block >, pool: &Arc>, network_state: &Arc, is_validator: bool, ) -> error::Result + Send>> where Block: BlockT::Out>, Backend: client::backend::Backend + 'static, Api: 'static, >::OffchainStorage: 'static, Client: ProvideRuntimeApi + Send + Sync, as ProvideRuntimeApi>::Api: offchain::OffchainWorkerApi, Executor: client::CallExecutor + 'static, PoolApi: txpool::ChainApi + 'static, { let future = offchain.on_block_imported(number, pool, network_state.clone(), is_validator) .map(|()| Ok(())); Ok(Box::new(Compat::new(future))) } #[cfg(test)] mod tests { use super::*; use consensus_common::{BlockOrigin, SelectChain}; use substrate_test_runtime_client::{prelude::*, runtime::Transfer}; #[test] fn should_remove_transactions_from_the_pool() { let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain(); let client = Arc::new(client); let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone())); let transaction = Transfer { amount: 5, nonce: 0, from: AccountKeyring::Alice.into(), to: Default::default(), }.into_signed_tx(); let best = longest_chain.best_chain().unwrap(); // store the transaction in the pool pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap(); // import the block let mut builder = client.new_block(Default::default()).unwrap(); builder.push(transaction.clone()).unwrap(); let block = builder.bake().unwrap(); let id = BlockId::hash(block.header().hash()); client.import(BlockOrigin::Own, block).unwrap(); // fire notification - this should clean up the queue assert_eq!(pool.status().ready, 1); maintain_transaction_pool( &id, &client, &pool, ).unwrap(); // then assert_eq!(pool.status().ready, 0); assert_eq!(pool.status().future, 0); } }