client.rs 74 KiB
Newer Older
Gav Wood's avatar
Gav Wood committed
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
Gav Wood's avatar
Gav Wood committed
use std::collections::{HashSet, HashMap, BTreeMap, VecDeque};
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::{Instant};
use hash::keccak;
use trie::{TrieSpec, TrieFactory, Trie};
Marek Kotewicz's avatar
Marek Kotewicz committed
use kvdb::{DBValue, KeyValueDB, DBTransaction};
use util_error::UtilError;

// other
Marek Kotewicz's avatar
Marek Kotewicz committed
use ethereum_types::{H256, Address, U256};
use block::{IsBlock, LockedBlock, Drain, ClosedBlock, OpenBlock, enact_verified, SealedBlock};
use blockchain::{BlockChain, BlockProvider,  TreeRoute, ImportRoute, TransactionAddress};
use client::ancient_import::AncientVerifier;
use client::Error as ClientError;
use client::{
Marek Kotewicz's avatar
Marek Kotewicz committed
	Nonce, Balance, ChainInfo, BlockInfo, CallContract, TransactionInfo,
	RegistryInfo, ReopenBlock, PrepareOpenBlock, ScheduleInfo, ImportSealedBlock,
	BroadcastProposalBlock, ImportBlock, StateOrBlock, StateInfo, StateClient, Call,
	AccountData, BlockChain as BlockChainTrait, BlockProducer, SealedBlockImporter,
	ClientIoMessage
Gav Wood's avatar
Gav Wood committed
use client::{
	BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient,
	MiningBlockChainClient, TraceFilter, CallAnalytics, BlockImportError, Mode,
	ChainNotify, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType
Gav Wood's avatar
Gav Wood committed
};
use engines::{EthEngine, EpochTransition};
use error::{ImportError, ExecutionError, CallError, BlockError, ImportResult, Error as EthcoreError};
use vm::{EnvInfo, LastHashes};
use evm::Schedule;
use executive::{Executive, Executed, TransactOptions, contract_address};
use factory::{Factories, VmFactory};
use header::{BlockNumber, Header};
use io::IoChannel;
use log_entry::LocalizedLogEntry;
use miner::{Miner, MinerService};
Marek Kotewicz's avatar
Marek Kotewicz committed
use parking_lot::{Mutex, RwLock};
use rand::OsRng;
use receipt::{Receipt, LocalizedReceipt};
use rlp::UntrustedRlp;
use snapshot::{self, io as snapshot_io};
use spec::Spec;
use state_db::StateDB;
use state::{self, State};
use trace;
use trace::{TraceDB, ImportRequest as TraceImportRequest, LocalizedTrace, Database as TraceDatabase};
use transaction::{self, LocalizedTransaction, UnverifiedTransaction, SignedTransaction, Transaction, PendingTransaction, Action};
use types::filter::Filter;
use types::mode::Mode as IpcMode;
use verification;
use verification::{PreverifiedBlock, Verifier};
use verification::queue::BlockQueue;
use views::BlockView;

// re-export
pub use types::blockchain_info::BlockChainInfo;
pub use types::block_status::BlockStatus;
pub use blockchain::CacheSize as BlockChainCacheSize;
pub use verification::queue::QueueInfo as BlockQueueInfo;
Marek Kotewicz's avatar
Marek Kotewicz committed
use_contract!(registry, "Registry", "res/contracts/registrar.json");

Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
const MIN_HISTORY_SIZE: u64 = 8;
Gav Wood's avatar
Gav Wood committed
/// Report on the status of a client.
#[derive(Default, Clone, Debug, Eq, PartialEq)]
Gav Wood's avatar
Gav Wood committed
pub struct ClientReport {
Gav Wood's avatar
Gav Wood committed
	/// How many blocks have been imported so far.
Gav Wood's avatar
Gav Wood committed
	pub blocks_imported: usize,
Gav Wood's avatar
Gav Wood committed
	/// How many transactions have been applied so far.
Gav Wood's avatar
Gav Wood committed
	pub transactions_applied: usize,
Gav Wood's avatar
Gav Wood committed
	/// How much gas has been processed so far.
Gav Wood's avatar
Gav Wood committed
	pub gas_processed: U256,
Gav Wood's avatar
Gav Wood committed
	/// Memory used by state DB
	pub state_db_mem: usize,
Gav Wood's avatar
Gav Wood committed
}

impl ClientReport {
Gav Wood's avatar
Gav Wood committed
	/// Alter internal reporting to reflect the additional `block` has been processed.
	pub fn accrue_block(&mut self, block: &PreverifiedBlock) {
Gav Wood's avatar
Gav Wood committed
		self.blocks_imported += 1;
		self.transactions_applied += block.transactions.len();
		self.gas_processed = self.gas_processed + block.header.gas_used().clone();
impl<'a> ::std::ops::Sub<&'a ClientReport> for ClientReport {
	type Output = Self;

	fn sub(mut self, other: &'a ClientReport) -> Self {
		let higher_mem = ::std::cmp::max(self.state_db_mem, other.state_db_mem);
		let lower_mem = ::std::cmp::min(self.state_db_mem, other.state_db_mem);

		self.blocks_imported -= other.blocks_imported;
		self.transactions_applied -= other.transactions_applied;
		self.gas_processed = self.gas_processed - other.gas_processed;
		self.state_db_mem  = higher_mem - lower_mem;

		self
	}
}

struct SleepState {
	last_activity: Option<Instant>,
	last_autosleep: Option<Instant>,
}

impl SleepState {
	fn new(awake: bool) -> Self {
		SleepState {
			last_activity: match awake { false => None, true => Some(Instant::now()) },
			last_autosleep: match awake { false => Some(Instant::now()), true => None },
		}
	}
}

struct Importer {
	/// Lock used during block import
	pub import_lock: Mutex<()>, // FIXME Maybe wrap the whole `Importer` instead?

	/// Used to verify blocks
	pub verifier: Box<Verifier<Client>>,

	/// Queue containing pending blocks
	pub block_queue: BlockQueue,

	/// Handles block sealing
	pub miner: Arc<Miner>,

	/// Ancient block verifier: import an ancient sequence of blocks in order from a starting epoch
	pub ancient_verifier: Mutex<Option<AncientVerifier>>,

	/// Random number generator used by `AncientVerifier`
	pub rng: Mutex<OsRng>,

	/// Ethereum engine to be used during import
	pub engine: Arc<EthEngine>,
}

Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
/// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue.
/// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue.
	/// Flag used to disable the client forever. Not to be confused with `liveness`.
	///
	/// For example, auto-updater will disable client forever if there is a
	/// hard fork registered on-chain that we don't have capability for.
	/// When hard fork block rolls around, the client (if `update` is false)
	/// knows it can't proceed further.
	enabled: AtomicBool,

	/// Operating mode for the client
Gav Wood's avatar
Gav Wood committed
	mode: Mutex<Mode>,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	chain: RwLock<Arc<BlockChain>>,
	tracedb: RwLock<TraceDB<BlockChain>>,
	engine: Arc<EthEngine>,

	/// Client configuration
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	config: ClientConfig,

	/// Database pruning strategy to use for StateDB
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	pruning: journaldb::Algorithm,

	/// Client uses this to store blocks, traces, etc.
	db: RwLock<Arc<KeyValueDB>>,
Tomasz Drwięga's avatar
Tomasz Drwięga committed
	state_db: RwLock<StateDB>,

	/// Report on the status of client
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	report: RwLock<ClientReport>,
	sleep_state: Mutex<SleepState>,

	/// Flag changed by `sleep` and `wake_up` methods. Not to be confused with `enabled`.
	liveness: AtomicBool,
	io_channel: Mutex<IoChannel<ClientIoMessage>>,

	/// List of actors to be notified on certain chain events
	notify: RwLock<Vec<Weak<ChainNotify>>>,

	/// Count of pending transactions in the queue
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	queue_transactions: AtomicUsize,
	last_hashes: RwLock<VecDeque<H256>>,

	/// Number of eras kept in a journal before they are pruned

	/// An action to be done if a mode/spec_name change happens
	on_user_defaults_change: Mutex<Option<Box<FnMut(Option<Mode>) + 'static + Send>>>,

	/// Link to a registry object useful for looking up names
Marek Kotewicz's avatar
Marek Kotewicz committed
	registrar: registry::Registry,
	registrar_address: Option<Address>,

	/// A closure to call when we want to restart the client
	exit_handler: Mutex<Option<Box<Fn(String) + 'static + Send>>>,

	importer: Importer,
impl Importer {
		config: &ClientConfig,
		engine: Arc<EthEngine>,
		message_channel: IoChannel<ClientIoMessage>,
		miner: Arc<Miner>,
	) -> Result<Importer, ::error::Error> {
		let block_queue = BlockQueue::new(config.queue.clone(), engine.clone(), message_channel.clone(), config.verifier_type.verifying_seal());
		Ok(Importer {
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
			import_lock: Mutex::new(()),
			verifier: verification::new(config.verifier_type.clone()),
			block_queue,
			miner,
			ancient_verifier: Mutex::new(None),
			rng: Mutex::new(OsRng::new()?),
	fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) {
		fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
			map.into_iter().map(|(k, _v)| k).collect()
		}

		// In ImportRoute we get all the blocks that have been enacted and retracted by single insert.
		// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
		// could be retracted in import `k+1`. This is why to understand if after all inserts
		// the block is enacted or retracted we iterate over all routes and at the end final state
		// will be in the hashmap
		let map = import_results.iter().fold(HashMap::new(), |mut map, route| {
			for hash in &route.enacted {
				map.insert(hash.clone(), true);
			for hash in &route.retracted {
				map.insert(hash.clone(), false);
			}
			map
		});

		// Split to enacted retracted (using hashmap value)
		let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
		// And convert tuples to keys
		(map_to_vec(enacted), map_to_vec(retracted))
	}
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	/// This is triggered by a message coming from a block queue when the block is ready for insertion
	pub fn import_verified_blocks(&self, client: &Client) -> usize {

		// Shortcut out if we know we're incapable of syncing the chain.
		if !client.enabled.load(AtomicOrdering::Relaxed) {
		let max_blocks_to_import = 4;
keorn's avatar
keorn committed
		let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, is_empty) = {
			let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
			let mut invalid_blocks = HashSet::new();
keorn's avatar
keorn committed
			let mut proposed_blocks = Vec::with_capacity(max_blocks_to_import);
			let mut import_results = Vec::with_capacity(max_blocks_to_import);
			let _import_lock = self.import_lock.lock();
			let blocks = self.block_queue.drain(max_blocks_to_import);
			if blocks.is_empty() {
				return 0;
			}
			trace_time!("import_verified_blocks");
			let start = Instant::now();
			for block in blocks {
				let header = &block.header;
				let is_invalid = invalid_blocks.contains(header.parent_hash());
				if is_invalid {
					invalid_blocks.insert(header.hash());
					continue;
				}
				if let Ok(closed_block) = self.check_and_close_block(&block, client) {
keorn's avatar
keorn committed
					if self.engine.is_proposal(&block.header) {
keorn's avatar
keorn committed
						self.block_queue.mark_as_good(&[header.hash()]);
keorn's avatar
keorn committed
						proposed_blocks.push(block.bytes);
					} else {
						imported_blocks.push(header.hash());
						let route = self.commit_block(closed_block, &header, &block.bytes, client);
keorn's avatar
keorn committed
						import_results.push(route);
						client.report.write().accrue_block(&block);
keorn's avatar
keorn committed
					}
				} else {
					invalid_blocks.insert(header.hash());
				}
			let imported = imported_blocks.len();
			let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();

			if !invalid_blocks.is_empty() {
				self.block_queue.mark_as_bad(&invalid_blocks);
			}
			let is_empty = self.block_queue.mark_as_good(&imported_blocks);
			let duration_ns = {
				let elapsed = start.elapsed();
				elapsed.as_secs() * 1_000_000_000 + elapsed.subsec_nanos() as u64
			};
			(imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration_ns, is_empty)
		};

		{
			if !imported_blocks.is_empty() && is_empty {
				let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);

				if is_empty {
					self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, &enacted, &retracted);
				}

				client.notify(|notify| {
					notify.new_blocks(
						imported_blocks.clone(),
						invalid_blocks.clone(),
						enacted.clone(),
						retracted.clone(),
						Vec::new(),
						proposed_blocks.clone(),
						duration,
					);
				});
			}
		}

		client.db.read().flush().expect("DB flush failed.");
		imported
	}

	fn check_and_close_block(&self, block: &PreverifiedBlock, client: &Client) -> Result<LockedBlock, ()> {
		let engine = &*self.engine;
		let header = &block.header;

		// Check the block isn't so old we won't be able to enact it.
		let best_block_number = client.chain.read().best_block_number();
		if client.pruning_info().earliest_state > header.number() {
			warn!(target: "client", "Block import failed for #{} ({})\nBlock is ancient (current best block: #{}).", header.number(), header.hash(), best_block_number);
			return Err(());
		}

		// Check if parent is in chain
		let parent = match client.block_header_decoded(BlockId::Hash(*header.parent_hash())) {
			Some(h) => h,
			None => {
				warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash());
				return Err(());
			}
		};

		let chain = client.chain.read();
		// Verify Block Family
		let verify_family_result = self.verifier.verify_block_family(
			header,
			&parent,
			engine,
			Some(verification::FullFamilyParams {
				block_bytes: &block.bytes,
				transactions: &block.transactions,
				block_provider: &**chain,
				client
			}),
		);
		if let Err(e) = verify_family_result {
			warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
			return Err(());
		let verify_external_result = self.verifier.verify_block_external(header, engine);
		if let Err(e) = verify_external_result {
			warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
			return Err(());
		};
		// Enact Verified Block
		let last_hashes = client.build_last_hashes(header.parent_hash());
		let db = client.state_db.read().boxed_clone_canon(header.parent_hash());
		let is_epoch_begin = chain.epoch_transition(parent.number(), *header.parent_hash()).is_some();
		let strip_receipts = header.number() < engine.params().validate_receipts_transition;
		let enact_result = enact_verified(block,
			engine,
			client.tracedb.read().tracing_enabled(),
			db,
			&parent,
			last_hashes,
			client.factories.clone(),
			is_epoch_begin,
			strip_receipts,

		let locked_block = enact_result.map_err(|e| {
			warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
		})?;

		// Final Verification
		if let Err(e) = self.verifier.verify_block_final(header, locked_block.block().header()) {
			warn!(target: "client", "Stage 5 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
			return Err(());
		}

		Ok(locked_block)
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	}
Gav Wood's avatar
Gav Wood committed

	/// Import a block with transaction receipts.
	///
	/// The block is guaranteed to be the next best blocks in the
	/// first block sequence. Does no sealing or transaction validation.
	fn import_old_block(&self, block_bytes: Bytes, receipts_bytes: Bytes, db: &KeyValueDB, chain: &BlockChain) -> Result<H256, ::error::Error> {
		let block = BlockView::new(&block_bytes);
		let header = block.header();
		let receipts = ::rlp::decode_list(&receipts_bytes);
		let hash = header.hash();
		let _import_lock = self.import_lock.lock();
			trace_time!("import_old_block");
			let mut ancient_verifier = self.ancient_verifier.lock();

			{
				// closure for verifying a block.
				let verify_with = |verifier: &AncientVerifier| -> Result<(), ::error::Error> {
					// verify the block, passing the chain for updating the epoch
					// verifier.
Tomasz Drwięga's avatar
Tomasz Drwięga committed
					let mut rng = OsRng::new().map_err(UtilError::from)?;
					verifier.verify(&mut rng, &header, &chain)
				// initialize the ancient block verifier if we don't have one already.
				match &mut *ancient_verifier {
					&mut Some(ref verifier) => {
						verify_with(verifier)?
					}
					x @ &mut None => {
						// load most recent epoch.
						trace!(target: "client", "Initializing ancient block restoration.");
						let current_epoch_data = chain.epoch_transitions()
							.take_while(|&(_, ref t)| t.block_number < header.number())
							.last()
							.map(|(_, t)| t.proof)
							.expect("At least one epoch entry (genesis) always stored; qed");

						let current_verifier = self.engine.epoch_verifier(&header, &current_epoch_data)
							.known_confirmed()?;
						let current_verifier = AncientVerifier::new(self.engine.clone(), current_verifier);

						verify_with(&current_verifier)?;
						*x = Some(current_verifier);
					}
				}
			}
			// Commit results
			let mut batch = DBTransaction::new();
			chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, false, true);
			// Final commit to the DB
			db.write_buffered(batch);
			chain.commit();
		}
		db.flush().expect("DB flush failed.");
	// NOTE: the header of the block passed here is not necessarily sealed, as
	// it is for reconstructing the state transition.
	//
	// The header passed is from the original block data and is sealed.
	fn commit_block<B>(&self, block: B, header: &Header, block_data: &[u8], client: &Client) -> ImportRoute where B: IsBlock + Drain {
		let hash = &header.hash();
		let number = header.number();
		let parent = header.parent_hash();
		let chain = client.chain.read();

		// Commit results
		let receipts = block.receipts().to_owned();
		let traces = block.traces().clone().drain();
		assert_eq!(header.hash(), BlockView::new(block_data).header_view().hash());
		//let traces = From::from(block.traces().clone().unwrap_or_else(Vec::new));
		let mut batch = DBTransaction::new();
		// CHECK! I *think* this is fine, even if the state_root is equal to another
		// already-imported block of the same number.
		// TODO: Prove it with a test.
		let mut state = block.drain();
		// check epoch end signal, potentially generating a proof on the current
		// state.
		self.check_epoch_end_signal(
			&header,
			block_data,
			&receipts,
			&state,
			&chain,
			&mut batch,
		state.journal_under(&mut batch, number, hash).expect("DB commit failed");
		let route = chain.insert_block(&mut batch, block_data, receipts.clone());
		client.tracedb.read().import(&mut batch, TraceImportRequest {
			traces: traces.into(),
			block_hash: hash.clone(),
			block_number: number,
			enacted: route.enacted.clone(),
			retracted: route.retracted.len()
		});
		let is_canon = route.enacted.last().map_or(false, |h| h == hash);
		state.sync_cache(&route.enacted, &route.retracted, is_canon);
Tomasz Drwięga's avatar
Tomasz Drwięga committed
		// Final commit to the DB
		client.db.read().write_buffered(batch);
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		chain.commit();
		self.check_epoch_end(&header, &chain, client);
		client.update_last_hashes(&parent, hash);
		if let Err(e) = client.prune_ancient(state, &chain) {
			warn!("Failed to prune ancient state data: {}", e);
		}

	// check for epoch end signal and write pending transition if it occurs.
	// state for the given block must be available.
	fn check_epoch_end_signal(
		&self,
		header: &Header,
		block_bytes: &[u8],
		receipts: &[Receipt],
		state_db: &StateDB,
		chain: &BlockChain,
		batch: &mut DBTransaction,
		client: &Client,

		let hash = header.hash();
		let auxiliary = ::machine::AuxiliaryData {
			bytes: Some(block_bytes),
			receipts: Some(&receipts),
		};

		match self.engine.signals_epoch_end(header, auxiliary) {
			EpochChange::Yes(proof) => {
				use engines::epoch::PendingTransition;
				use engines::Proof;

				let proof = match proof {
					Proof::Known(proof) => proof,
					Proof::WithState(with_state) => {
						let env_info = EnvInfo {
							number: header.number(),
							author: header.author().clone(),
							timestamp: header.timestamp(),
							difficulty: header.difficulty().clone(),
							last_hashes: client.build_last_hashes(header.parent_hash()),
							gas_used: U256::default(),
							gas_limit: u64::max_value().into(),
						};

						let call = move |addr, data| {
							let mut state_db = state_db.boxed_clone();
							let backend = ::state::backend::Proving::new(state_db.as_hashdb_mut());

							let transaction =
								client.contract_call_tx(BlockId::Hash(*header.parent_hash()), addr, data);

							let mut state = State::from_existing(
								backend,
								header.state_root().clone(),
								self.engine.account_start_nonce(header.number()),
								client.factories.clone(),
							).expect("state known to be available for just-imported block; qed");

							let options = TransactOptions::with_no_tracing().dont_check_nonce();
							let res = Executive::new(&mut state, &env_info, self.engine.machine())
								.transact(&transaction, options);

							let res = match res {
								Err(ExecutionError::Internal(e)) =>
									Err(format!("Internal error: {}", e)),
								Err(e) => {
									trace!(target: "client", "Proved call failed: {}", e);
									Ok((Vec::new(), state.drop().1.extract_proof()))
								}
								Ok(res) => Ok((res.output, state.drop().1.extract_proof())),
							};

							res.map(|(output, proof)| (output, proof.into_iter().map(|x| x.into_vec()).collect()))
						};

						match with_state.generate_proof(&call) {
							Ok(proof) => proof,
							Err(e) => {
								warn!(target: "client", "Failed to generate transition proof for block {}: {}", hash, e);
								warn!(target: "client", "Snapshots produced by this client may be incomplete");
								Vec::new()
							}
						}
					}
				};

				debug!(target: "client", "Block {} signals epoch end.", hash);

				let pending = PendingTransition { proof: proof };
				chain.insert_pending_transition(batch, hash, pending);
			},
			EpochChange::No => {},
			EpochChange::Unsure(_) => {
				warn!(target: "client", "Detected invalid engine implementation.");
				warn!(target: "client", "Engine claims to require more block data, but everything provided.");
			}
		}
	}

	// check for ending of epoch and write transition if it occurs.
	fn check_epoch_end<'a>(&self, header: &'a Header, chain: &BlockChain, client: &Client) {
		let is_epoch_end = self.engine.is_epoch_end(
			header,
			&(|hash| client.block_header_decoded(BlockId::Hash(hash))),
			&(|hash| chain.get_pending_transition(hash)), // TODO: limit to current epoch.
		);

		if let Some(proof) = is_epoch_end {
			debug!(target: "client", "Epoch transition at block {}", header.hash());

			let mut batch = DBTransaction::new();
			chain.insert_epoch_transition(&mut batch, header.number(), EpochTransition {
				block_hash: header.hash(),
				block_number: header.number(),
				proof: proof,
			});

			// always write the batch directly since epoch transition proofs are
			// fetched from a DB iterator and DB iterators are only available on
			// flushed data.
			client.db.read().write(batch).expect("DB flush failed");
		}
	}
}

impl Client {
	/// Create a new client with given parameters.
	/// The database is assumed to have been initialized with the correct columns.
	pub fn new(
		config: ClientConfig,
		spec: &Spec,
		db: Arc<KeyValueDB>,
		miner: Arc<Miner>,
		message_channel: IoChannel<ClientIoMessage>,
	) -> Result<Arc<Client>, ::error::Error> {
		let trie_spec = match config.fat_db {
			true => TrieSpec::Fat,
			false => TrieSpec::Secure,
		};

		let trie_factory = TrieFactory::new(trie_spec);
		let factories = Factories {
			vm: VmFactory::new(config.vm_type.clone(), config.jump_table_size),
			trie: trie_factory,
			accountdb: Default::default(),
		};

		let journal_db = journaldb::new(db.clone(), config.pruning, ::db::COL_STATE);
		let mut state_db = StateDB::new(journal_db, config.state_cache_size);
		if state_db.journal_db().is_empty() {
			// Sets the correct state root.
			state_db = spec.ensure_db_good(state_db, &factories)?;
			let mut batch = DBTransaction::new();
			state_db.journal_under(&mut batch, 0, &spec.genesis_header().hash())?;
			db.write(batch).map_err(ClientError::Database)?;
		}

		let gb = spec.genesis_block();
		let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone()));
		let tracedb = RwLock::new(TraceDB::new(config.tracing.clone(), db.clone(), chain.clone()));

		trace!("Cleanup journal: DB Earliest = {:?}, Latest = {:?}", state_db.journal_db().earliest_era(), state_db.journal_db().latest_era());

		let history = if config.history < MIN_HISTORY_SIZE {
			info!(target: "client", "Ignoring pruning history parameter of {}\
				, falling back to minimum of {}",
				config.history, MIN_HISTORY_SIZE);
			MIN_HISTORY_SIZE
		} else {
			config.history
		};

		if !chain.block_header_data(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(&h.state_root())) {
			warn!("State root not found for block #{} ({:x})", chain.best_block_number(), chain.best_block_hash());
		}

		let engine = spec.engine.clone();

		let awake = match config.mode { Mode::Dark(..) | Mode::Off => false, _ => true };

		let importer = Importer::new(&config, engine.clone(), message_channel.clone(), miner)?;

		let registrar_address = engine.additional_params().get("registrar").and_then(|s| Address::from_str(s).ok());
		if let Some(ref addr) = registrar_address {
			trace!(target: "client", "Found registrar at {}", addr);
		}

		let client = Arc::new(Client {
			enabled: AtomicBool::new(true),
			sleep_state: Mutex::new(SleepState::new(awake)),
			liveness: AtomicBool::new(awake),
			mode: Mutex::new(config.mode.clone()),
			chain: RwLock::new(chain),
			tracedb: tracedb,
			engine: engine,
			pruning: config.pruning.clone(),
			config: config,
			db: RwLock::new(db),
			state_db: RwLock::new(state_db),
			report: RwLock::new(Default::default()),
			io_channel: Mutex::new(message_channel),
			notify: RwLock::new(Vec::new()),
			queue_transactions: AtomicUsize::new(0),
			last_hashes: RwLock::new(VecDeque::new()),
			factories: factories,
			history: history,
			on_user_defaults_change: Mutex::new(None),
			registrar: registry::Registry::default(),
			registrar_address,
			exit_handler: Mutex::new(None),
			importer,
		});

		// prune old states.
		{
			let state_db = client.state_db.read().boxed_clone();
			let chain = client.chain.read();
			client.prune_ancient(state_db, &chain)?;
		}

		// ensure genesis epoch proof in the DB.
		{
			let chain = client.chain.read();
			let gh = spec.genesis_header();
			if chain.epoch_transition(0, gh.hash()).is_none() {
				trace!(target: "client", "No genesis transition found.");

				let proof = client.with_proving_caller(
					BlockId::Number(0),
					|call| client.engine.genesis_epoch_data(&gh, call)
				);
				let proof = match proof {
					Ok(proof) => proof,
					Err(e) => {
						warn!(target: "client", "Error generating genesis epoch data: {}. Snapshots generated may not be complete.", e);
						Vec::new()
					}
				};

				debug!(target: "client", "Obtained genesis transition proof: {:?}", proof);

				let mut batch = DBTransaction::new();
				chain.insert_epoch_transition(&mut batch, 0, EpochTransition {
					block_hash: gh.hash(),
					block_number: 0,
					proof: proof,
				});

				client.db.read().write_buffered(batch);
			}
		}

		// ensure buffered changes are flushed.
		client.db.read().flush().map_err(ClientError::Database)?;
		Ok(client)
	}

	/// Wakes up client if it's a sleep.
	pub fn keep_alive(&self) {
		let should_wake = match *self.mode.lock() {
			Mode::Dark(..) | Mode::Passive(..) => true,
			_ => false,
		};
		if should_wake {
			self.wake_up();
			(*self.sleep_state.lock()).last_activity = Some(Instant::now());
		}
	}
	/// Adds an actor to be notified on certain events
	pub fn add_notify(&self, target: Arc<ChainNotify>) {
		self.notify.write().push(Arc::downgrade(&target));
	}
	/// Set a closure to call when the client wants to be restarted.
	///
	/// The parameter passed to the callback is the name of the new chain spec to use after
	/// the restart.
	pub fn set_exit_handler<F>(&self, f: F) where F: Fn(String) + 'static + Send {
		*self.exit_handler.lock() = Some(Box::new(f));
	}
	/// Returns engine reference.
	pub fn engine(&self) -> &EthEngine {
		&*self.engine
	}
	fn notify<F>(&self, f: F) where F: Fn(&ChainNotify) {
		for np in self.notify.read().iter() {
			if let Some(n) = np.upgrade() {
				f(&*n);
	/// Register an action to be done if a mode/spec_name change happens.
	pub fn on_user_defaults_change<F>(&self, f: F) where F: 'static + FnMut(Option<Mode>) + Send {
		*self.on_user_defaults_change.lock() = Some(Box::new(f));
	}
	/// Flush the block import queue.
	pub fn flush_queue(&self) {
		self.importer.block_queue.flush();
		while !self.importer.block_queue.queue_info().is_empty() {
			self.import_verified_blocks();
		}
	}
	/// The env info as of the best block.
	pub fn latest_env_info(&self) -> EnvInfo {
		self.env_info(BlockId::Latest).expect("Best block header always stored; qed")
	}
	/// The env info as of a given block.
	/// returns `None` if the block unknown.
	pub fn env_info(&self, id: BlockId) -> Option<EnvInfo> {
		self.block_header(id).map(|header| {
			EnvInfo {
				number: header.number(),
				author: header.author(),
				timestamp: header.timestamp(),
				difficulty: header.difficulty(),
				last_hashes: self.build_last_hashes(&header.parent_hash()),
				gas_used: U256::default(),
				gas_limit: header.gas_limit(),
			}
		})
	}

	fn build_last_hashes(&self, parent_hash: &H256) -> Arc<LastHashes> {
		{
			let hashes = self.last_hashes.read();
			if hashes.front().map_or(false, |h| h == parent_hash) {
				let mut res = Vec::from(hashes.clone());
				res.resize(256, H256::default());
				return Arc::new(res);
			}
		}
		let mut last_hashes = LastHashes::new();
		last_hashes.resize(256, H256::default());
		last_hashes[0] = parent_hash.clone();
		let chain = self.chain.read();
		for i in 0..255 {
			match chain.block_details(&last_hashes[i]) {
				Some(details) => {
					last_hashes[i + 1] = details.parent.clone();
				},
				None => break,
			}
		let mut cached_hashes = self.last_hashes.write();
		*cached_hashes = VecDeque::from(last_hashes.clone());
		Arc::new(last_hashes)
	}


	/// This is triggered by a message coming from a block queue when the block is ready for insertion
	pub fn import_verified_blocks(&self) -> usize {
		self.importer.import_verified_blocks(self)
	}

	// use a state-proving closure for the given block.
	fn with_proving_caller<F, T>(&self, id: BlockId, with_call: F) -> T
		where F: FnOnce(&::machine::Call) -> T
	{
		let call = |a, d| {
			let tx = self.contract_call_tx(id, a, d);
			let (result, items) = self.prove_transaction(tx, id)
				.ok_or_else(|| format!("Unable to make call. State unavailable?"))?;

			let items = items.into_iter().map(|x| x.to_vec()).collect();
			Ok((result, items))
		};

		with_call(&call)
	}

	// prune ancient states until below the memory limit or only the minimum amount remain.
	fn prune_ancient(&self, mut state_db: StateDB, chain: &BlockChain) -> Result<(), ClientError> {
		let number = match state_db.journal_db().latest_era() {
			Some(n) => n,
			None => return Ok(()),
		};

		// prune all ancient eras until we're below the memory target,
		// but have at least the minimum number of states.
		loop {
			let needs_pruning = state_db.journal_db().is_pruned() &&
				state_db.journal_db().journal_size() >= self.config.history_mem;

			if !needs_pruning { break }
			match state_db.journal_db().earliest_era() {
				Some(era) if era + self.history <= number => {
					trace!(target: "client", "Pruning state for ancient era {}", era);
					match chain.block_hash(era) {
						Some(ancient_hash) => {
							let mut batch = DBTransaction::new();
							state_db.mark_canonical(&mut batch, era, &ancient_hash)?;
							self.db.read().write_buffered(batch);
							state_db.journal_db().flush();
						}
						None =>
							debug!(target: "client", "Missing expected hash for block {}", era),
					}
				}
				_ => break, // means that every era is kept, no pruning necessary.
			}
		}

		Ok(())
	}

	fn update_last_hashes(&self, parent: &H256, hash: &H256) {
		let mut hashes = self.last_hashes.write();
		if hashes.front().map_or(false, |h| h == parent) {
			if hashes.len() > 255 {
				hashes.pop_back();
			}
			hashes.push_front(hash.clone());
		}
	}

Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	/// Import transactions from the IO queue
	pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: usize) -> usize {
keorn's avatar
keorn committed
		trace!(target: "external_tx", "Importing queued");
		trace_time!("import_queued_transactions");
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
		let txs: Vec<UnverifiedTransaction> = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect();
		let hashes: Vec<_> = txs.iter().map(|tx| tx.hash()).collect();
		self.notify(|notify| {
			notify.transactions_received(hashes.clone(), peer_id);
		let results = self.importer.miner.import_external_transactions(self, txs);
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		results.len()
	}

	/// Get shared miner reference.
	pub fn miner(&self) -> Arc<Miner> {
		self.importer.miner.clone()
	/// Replace io channel. Useful for testing.
	pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
		*self.io_channel.lock() = io_channel;
keorn's avatar
keorn committed
	}

	/// Get a copy of the best block's state.