client.rs 79.8 KiB
Newer Older
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashSet, BTreeMap, VecDeque};
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Arc, Weak};
use std::time::{Instant, Duration};
use hash::keccak;
use itertools::Itertools;
use trie::{TrieSpec, TrieFactory, Trie};
Marek Kotewicz's avatar
Marek Kotewicz committed
use kvdb::{DBValue, KeyValueDB, DBTransaction};
use util_error::UtilError;

// other
Marek Kotewicz's avatar
Marek Kotewicz committed
use ethereum_types::{H256, Address, U256};
use block::{IsBlock, LockedBlock, Drain, ClosedBlock, OpenBlock, enact_verified, SealedBlock};
use blockchain::{BlockChain, BlockChainDB, BlockProvider, TreeRoute, ImportRoute, TransactionAddress, ExtrasInsert};
use client::ancient_import::AncientVerifier;
use client::Error as ClientError;
use client::{
Marek Kotewicz's avatar
Marek Kotewicz committed
	Nonce, Balance, ChainInfo, BlockInfo, CallContract, TransactionInfo,
	RegistryInfo, ReopenBlock, PrepareOpenBlock, ScheduleInfo, ImportSealedBlock,
	BroadcastProposalBlock, ImportBlock, StateOrBlock, StateInfo, StateClient, Call,
	AccountData, BlockChain as BlockChainTrait, BlockProducer, SealedBlockImporter,
	ClientIoMessage
Gav Wood's avatar
Gav Wood committed
use client::{
	BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient,
	TraceFilter, CallAnalytics, BlockImportError, Mode,
	ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType,
	IoClient,
Gav Wood's avatar
Gav Wood committed
};
use engines::{EthEngine, EpochTransition, ForkChoice};
use error::{ImportErrorKind, BlockImportErrorKind, ExecutionError, CallError, BlockError, ImportResult, Error as EthcoreError};
use vm::{EnvInfo, LastHashes};
use evm::Schedule;
use executive::{Executive, Executed, TransactOptions, contract_address};
use factory::{Factories, VmFactory};
use header::{BlockNumber, Header, ExtendedHeader};
use io::{IoChannel, IoError};
use log_entry::LocalizedLogEntry;
use miner::{Miner, MinerService};
use ethcore_miner::pool::VerifiedTransaction;
Marek Kotewicz's avatar
Marek Kotewicz committed
use parking_lot::{Mutex, RwLock};
use rand::OsRng;
use receipt::{Receipt, LocalizedReceipt};
use snapshot::{self, io as snapshot_io};
use spec::Spec;
use state_db::StateDB;
use state::{self, State};
use trace;
use trace::{TraceDB, ImportRequest as TraceImportRequest, LocalizedTrace, Database as TraceDatabase};
use transaction::{self, LocalizedTransaction, UnverifiedTransaction, SignedTransaction, Transaction, Action};
use types::filter::Filter;
use types::ancestry_action::AncestryAction;
use verification;
use verification::{PreverifiedBlock, Verifier};
use verification::queue::BlockQueue;
use views::BlockView;
use parity_machine::{Finalizable, WithMetadata};

// re-export
pub use types::blockchain_info::BlockChainInfo;
pub use types::block_status::BlockStatus;
pub use blockchain::CacheSize as BlockChainCacheSize;
pub use verification::queue::QueueInfo as BlockQueueInfo;
Marek Kotewicz's avatar
Marek Kotewicz committed
use_contract!(registry, "Registry", "res/contracts/registrar.json");

Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
// Max number of blocks imported at once.
const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4;
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
const MIN_HISTORY_SIZE: u64 = 8;
Gav Wood's avatar
Gav Wood committed
/// Report on the status of a client.
#[derive(Default, Clone, Debug, Eq, PartialEq)]
Gav Wood's avatar
Gav Wood committed
pub struct ClientReport {
Gav Wood's avatar
Gav Wood committed
	/// How many blocks have been imported so far.
Gav Wood's avatar
Gav Wood committed
	pub blocks_imported: usize,
Gav Wood's avatar
Gav Wood committed
	/// How many transactions have been applied so far.
Gav Wood's avatar
Gav Wood committed
	pub transactions_applied: usize,
Gav Wood's avatar
Gav Wood committed
	/// How much gas has been processed so far.
Gav Wood's avatar
Gav Wood committed
	pub gas_processed: U256,
Gav Wood's avatar
Gav Wood committed
	/// Memory used by state DB
	pub state_db_mem: usize,
Gav Wood's avatar
Gav Wood committed
}

impl ClientReport {
Gav Wood's avatar
Gav Wood committed
	/// Alter internal reporting to reflect the additional `block` has been processed.
	pub fn accrue_block(&mut self, header: &Header, transactions: usize) {
Gav Wood's avatar
Gav Wood committed
		self.blocks_imported += 1;
		self.transactions_applied += transactions;
		self.gas_processed = self.gas_processed + *header.gas_used();
impl<'a> ::std::ops::Sub<&'a ClientReport> for ClientReport {
	type Output = Self;

	fn sub(mut self, other: &'a ClientReport) -> Self {
		let higher_mem = ::std::cmp::max(self.state_db_mem, other.state_db_mem);
		let lower_mem = ::std::cmp::min(self.state_db_mem, other.state_db_mem);

		self.blocks_imported -= other.blocks_imported;
		self.transactions_applied -= other.transactions_applied;
		self.gas_processed = self.gas_processed - other.gas_processed;
		self.state_db_mem = higher_mem - lower_mem;
struct SleepState {
	last_activity: Option<Instant>,
	last_autosleep: Option<Instant>,
}

impl SleepState {
	fn new(awake: bool) -> Self {
		SleepState {
			last_activity: match awake { false => None, true => Some(Instant::now()) },
			last_autosleep: match awake { false => Some(Instant::now()), true => None },
		}
	}
}

struct Importer {
	/// Lock used during block import
	pub import_lock: Mutex<()>, // FIXME Maybe wrap the whole `Importer` instead?

	/// Used to verify blocks
	pub verifier: Box<Verifier<Client>>,

	/// Queue containing pending blocks
	pub block_queue: BlockQueue,

	/// Handles block sealing
	pub miner: Arc<Miner>,

	/// Ancient block verifier: import an ancient sequence of blocks in order from a starting epoch
	pub ancient_verifier: AncientVerifier,

	/// Ethereum engine to be used during import
	pub engine: Arc<EthEngine>,
}

Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
/// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue.
/// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue.
	/// Flag used to disable the client forever. Not to be confused with `liveness`.
	///
	/// For example, auto-updater will disable client forever if there is a
	/// hard fork registered on-chain that we don't have capability for.
	/// When hard fork block rolls around, the client (if `update` is false)
	/// knows it can't proceed further.
	enabled: AtomicBool,

	/// Operating mode for the client
Gav Wood's avatar
Gav Wood committed
	mode: Mutex<Mode>,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	chain: RwLock<Arc<BlockChain>>,
	tracedb: RwLock<TraceDB<BlockChain>>,
	engine: Arc<EthEngine>,

	/// Client configuration
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	config: ClientConfig,

	/// Database pruning strategy to use for StateDB
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	pruning: journaldb::Algorithm,

	/// Client uses this to store blocks, traces, etc.
	db: RwLock<Arc<BlockChainDB>>,
Tomasz Drwięga's avatar
Tomasz Drwięga committed
	state_db: RwLock<StateDB>,

	/// Report on the status of client
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	report: RwLock<ClientReport>,
	sleep_state: Mutex<SleepState>,

	/// Flag changed by `sleep` and `wake_up` methods. Not to be confused with `enabled`.
	liveness: AtomicBool,
	io_channel: Mutex<IoChannel<ClientIoMessage>>,

	/// List of actors to be notified on certain chain events
	notify: RwLock<Vec<Weak<ChainNotify>>>,
	/// Queued transactions from IO
	queue_transactions: IoChannelQueue,
	/// Ancient blocks import queue
	queue_ancient_blocks: IoChannelQueue,
	/// Queued ancient blocks, make sure they are imported in order.
	queued_ancient_blocks: Arc<RwLock<(
		HashSet<H256>,
		VecDeque<(Header, Bytes, Bytes)>
	)>>,
	ancient_blocks_import_lock: Arc<Mutex<()>>,
	/// Consensus messages import queue
	queue_consensus_message: IoChannelQueue,

	last_hashes: RwLock<VecDeque<H256>>,

	/// Number of eras kept in a journal before they are pruned

	/// An action to be done if a mode/spec_name change happens
	on_user_defaults_change: Mutex<Option<Box<FnMut(Option<Mode>) + 'static + Send>>>,

	/// Link to a registry object useful for looking up names
Marek Kotewicz's avatar
Marek Kotewicz committed
	registrar: registry::Registry,
	registrar_address: Option<Address>,

	/// A closure to call when we want to restart the client
	exit_handler: Mutex<Option<Box<Fn(String) + 'static + Send>>>,

	importer: Importer,
impl Importer {
		config: &ClientConfig,
		engine: Arc<EthEngine>,
		message_channel: IoChannel<ClientIoMessage>,
		miner: Arc<Miner>,
	) -> Result<Importer, ::error::Error> {
		let block_queue = BlockQueue::new(config.queue.clone(), engine.clone(), message_channel.clone(), config.verifier_type.verifying_seal());
		Ok(Importer {
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
			import_lock: Mutex::new(()),
			verifier: verification::new(config.verifier_type.clone()),
			block_queue,
			miner,
			ancient_verifier: AncientVerifier::new(engine.clone()),
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	/// This is triggered by a message coming from a block queue when the block is ready for insertion
	pub fn import_verified_blocks(&self, client: &Client) -> usize {

		// Shortcut out if we know we're incapable of syncing the chain.
		if !client.enabled.load(AtomicOrdering::Relaxed) {
		let max_blocks_to_import = 4;
keorn's avatar
keorn committed
		let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, is_empty) = {
			let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
			let mut invalid_blocks = HashSet::new();
keorn's avatar
keorn committed
			let mut proposed_blocks = Vec::with_capacity(max_blocks_to_import);
			let mut import_results = Vec::with_capacity(max_blocks_to_import);
			let _import_lock = self.import_lock.lock();
			let blocks = self.block_queue.drain(max_blocks_to_import);
			if blocks.is_empty() {
				return 0;
			}
			trace_time!("import_verified_blocks");
			let start = Instant::now();
				let header = block.header.clone();
				let bytes = block.bytes.clone();
				let hash = header.hash();

				let is_invalid = invalid_blocks.contains(header.parent_hash());
				if is_invalid {
					invalid_blocks.insert(hash);

				if let Ok(closed_block) = self.check_and_close_block(block, client) {
					if self.engine.is_proposal(&header) {
						self.block_queue.mark_as_good(&[hash]);
						proposed_blocks.push(bytes);
keorn's avatar
keorn committed
					} else {
						imported_blocks.push(hash);

						let transactions_len = closed_block.transactions().len();
						let route = self.commit_block(closed_block, &header, &bytes, client);
keorn's avatar
keorn committed
						import_results.push(route);
						client.report.write().accrue_block(&header, transactions_len);
keorn's avatar
keorn committed
					}
				} else {
					invalid_blocks.insert(header.hash());
				}
			let imported = imported_blocks.len();
			let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();

			if !invalid_blocks.is_empty() {
				self.block_queue.mark_as_bad(&invalid_blocks);
			}
			let is_empty = self.block_queue.mark_as_good(&imported_blocks);
			(imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, start.elapsed(), is_empty)
		};

		{
			if !imported_blocks.is_empty() && is_empty {
				let route = ChainRoute::from(import_results.as_ref());

				if is_empty {
					self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, route.enacted(), route.retracted(), false);
				}

				client.notify(|notify| {
					notify.new_blocks(
						imported_blocks.clone(),
						invalid_blocks.clone(),
						Vec::new(),
						proposed_blocks.clone(),
						duration,
					);
				});
			}
		}

		let db = client.db.read();
		db.key_value().flush().expect("DB flush failed.");
	fn check_and_close_block(&self, block: PreverifiedBlock, client: &Client) -> Result<LockedBlock, ()> {
		let engine = &*self.engine;
		let header = block.header.clone();

		// Check the block isn't so old we won't be able to enact it.
		let best_block_number = client.chain.read().best_block_number();
		if client.pruning_info().earliest_state > header.number() {
			warn!(target: "client", "Block import failed for #{} ({})\nBlock is ancient (current best block: #{}).", header.number(), header.hash(), best_block_number);
			return Err(());
		}

		// Check if parent is in chain
		let parent = match client.block_header_decoded(BlockId::Hash(*header.parent_hash())) {
			Some(h) => h,
			None => {
				warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash());
				return Err(());
			}
		};

		let chain = client.chain.read();
		// Verify Block Family
		let verify_family_result = self.verifier.verify_block_family(
			&parent,
			engine,
			Some(verification::FullFamilyParams {
				block_bytes: &block.bytes,
				transactions: &block.transactions,
				block_provider: &**chain,
				client
			}),
		);
		if let Err(e) = verify_family_result {
			warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
			return Err(());
		let verify_external_result = self.verifier.verify_block_external(&header, engine);
		if let Err(e) = verify_external_result {
			warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
			return Err(());
		};
		// Enact Verified Block
		let last_hashes = client.build_last_hashes(header.parent_hash());
		let db = client.state_db.read().boxed_clone_canon(header.parent_hash());
		let is_epoch_begin = chain.epoch_transition(parent.number(), *header.parent_hash()).is_some();
		let enact_result = enact_verified(
			block,
			engine,
			client.tracedb.read().tracing_enabled(),
			db,
			&parent,
			last_hashes,
			client.factories.clone(),
			is_epoch_begin,
			&mut chain.ancestry_with_metadata_iter(*header.parent_hash()),
		let mut locked_block = enact_result.map_err(|e| {
			warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
		})?;

		// Strip receipts for blocks before validate_receipts_transition,
		// if the expected receipts root header does not match.
		// (i.e. allow inconsistency in receipts outcome before the transition block)
		if header.number() < engine.params().validate_receipts_transition
			&& header.receipts_root() != locked_block.block().header().receipts_root()
		{
			locked_block.strip_receipts_outcomes();
		}

		// Final Verification
		if let Err(e) = self.verifier.verify_block_final(&header, locked_block.block().header()) {
			warn!(target: "client", "Stage 5 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
			return Err(());
		}

		Ok(locked_block)
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	}
Gav Wood's avatar
Gav Wood committed

	/// Import a block with transaction receipts.
	///
	/// The block is guaranteed to be the next best blocks in the
	/// first block sequence. Does no sealing or transaction validation.
	fn import_old_block(&self, header: &Header, block_bytes: &[u8], receipts_bytes: &[u8], db: &KeyValueDB, chain: &BlockChain) -> Result<H256, ::error::Error> {
		let receipts = ::rlp::decode_list(receipts_bytes);
		let hash = header.hash();
		let _import_lock = self.import_lock.lock();
			trace_time!("import_old_block");
			// verify the block, passing the chain for updating the epoch verifier.
			let mut rng = OsRng::new().map_err(UtilError::from)?;
			self.ancient_verifier.verify(&mut rng, &header, &chain)?;
			// Commit results
			let mut batch = DBTransaction::new();
			chain.insert_unordered_block(&mut batch, block_bytes, receipts, None, false, true);
			// Final commit to the DB
			db.write_buffered(batch);
			chain.commit();
		}
		db.flush().expect("DB flush failed.");
	// NOTE: the header of the block passed here is not necessarily sealed, as
	// it is for reconstructing the state transition.
	//
	// The header passed is from the original block data and is sealed.
	fn commit_block<B>(&self, block: B, header: &Header, block_data: &[u8], client: &Client) -> ImportRoute where B: IsBlock + Drain {
		let hash = &header.hash();
		let number = header.number();
		let parent = header.parent_hash();
		let chain = client.chain.read();

		// Commit results
		let receipts = block.receipts().to_owned();
		let traces = block.traces().clone().drain();
		assert_eq!(header.hash(), view!(BlockView, block_data).header_view().hash());
		//let traces = From::from(block.traces().clone().unwrap_or_else(Vec::new));
		let mut batch = DBTransaction::new();
		let ancestry_actions = self.engine.ancestry_actions(block.block(), &mut chain.ancestry_with_metadata_iter(*parent));

		let best_hash = chain.best_block_hash();
		let metadata = block.block().metadata().map(Into::into);
		let is_finalized = block.block().is_finalized();

		let new = ExtendedHeader {
			header: header.clone(),
			is_finalized: is_finalized,
			metadata: metadata,
			parent_total_difficulty: chain.block_details(&parent).expect("Parent block is in the database; qed").total_difficulty
		};

		let best = {
			let hash = best_hash;
			let header = chain.block_header_data(&hash)
				.expect("Best block is in the database; qed")
				.decode()
				.expect("Stored block header is valid RLP; qed");
			let details = chain.block_details(&hash)
				.expect("Best block is in the database; qed");

			ExtendedHeader {
				parent_total_difficulty: details.total_difficulty - *header.difficulty(),
				is_finalized: details.is_finalized,
				metadata: details.metadata,

				header: header,
			}
		};

		let route = chain.tree_route(best_hash, *parent).expect("forks are only kept when it has common ancestors; tree route from best to prospective's parent always exists; qed");
		let fork_choice = if route.is_from_route_finalized {
			ForkChoice::Old
		} else {
			self.engine.fork_choice(&new, &best)
		};

		// CHECK! I *think* this is fine, even if the state_root is equal to another
		// already-imported block of the same number.
		// TODO: Prove it with a test.
		let mut state = block.drain();
		// check epoch end signal, potentially generating a proof on the current
		// state.
		self.check_epoch_end_signal(
			&header,
			block_data,
			&receipts,
			&state,
			&chain,
			&mut batch,
		state.journal_under(&mut batch, number, hash).expect("DB commit failed");

		for ancestry_action in ancestry_actions {
			let AncestryAction::MarkFinalized(ancestry) = ancestry_action;
			chain.mark_finalized(&mut batch, ancestry).expect("Engine's ancestry action must be known blocks; qed");
		}

		let route = chain.insert_block(&mut batch, block_data, receipts.clone(), ExtrasInsert {
			fork_choice: fork_choice,
			is_finalized: is_finalized,
			metadata: new.metadata,
		});
		client.tracedb.read().import(&mut batch, TraceImportRequest {
			traces: traces.into(),
			block_hash: hash.clone(),
			block_number: number,
			enacted: route.enacted.clone(),
			retracted: route.retracted.len()
		});
		let is_canon = route.enacted.last().map_or(false, |h| h == hash);
		state.sync_cache(&route.enacted, &route.retracted, is_canon);
Tomasz Drwięga's avatar
Tomasz Drwięga committed
		// Final commit to the DB
		client.db.read().key_value().write_buffered(batch);
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		chain.commit();
		self.check_epoch_end(&header, &chain, client);
		client.update_last_hashes(&parent, hash);
		if let Err(e) = client.prune_ancient(state, &chain) {
			warn!("Failed to prune ancient state data: {}", e);
		}

	// check for epoch end signal and write pending transition if it occurs.
	// state for the given block must be available.
	fn check_epoch_end_signal(
		&self,
		header: &Header,
		block_bytes: &[u8],
		receipts: &[Receipt],
		state_db: &StateDB,
		chain: &BlockChain,
		batch: &mut DBTransaction,
		client: &Client,

		let hash = header.hash();
		let auxiliary = ::machine::AuxiliaryData {
			bytes: Some(block_bytes),
			receipts: Some(&receipts),
		};

		match self.engine.signals_epoch_end(header, auxiliary) {
			EpochChange::Yes(proof) => {
				use engines::epoch::PendingTransition;
				use engines::Proof;

				let proof = match proof {
					Proof::Known(proof) => proof,
					Proof::WithState(with_state) => {
						let env_info = EnvInfo {
							number: header.number(),
							author: header.author().clone(),
							timestamp: header.timestamp(),
							difficulty: header.difficulty().clone(),
							last_hashes: client.build_last_hashes(header.parent_hash()),
							gas_used: U256::default(),
							gas_limit: u64::max_value().into(),
						};

						let call = move |addr, data| {
							let mut state_db = state_db.boxed_clone();
							let backend = ::state::backend::Proving::new(state_db.as_hashdb_mut());

							let transaction =
								client.contract_call_tx(BlockId::Hash(*header.parent_hash()), addr, data);

							let mut state = State::from_existing(
								backend,
								header.state_root().clone(),
								self.engine.account_start_nonce(header.number()),
								client.factories.clone(),
							).expect("state known to be available for just-imported block; qed");

							let options = TransactOptions::with_no_tracing().dont_check_nonce();
							let res = Executive::new(&mut state, &env_info, self.engine.machine())
								.transact(&transaction, options);

							let res = match res {
								Err(ExecutionError::Internal(e)) =>
									Err(format!("Internal error: {}", e)),
								Err(e) => {
									trace!(target: "client", "Proved call failed: {}", e);
									Ok((Vec::new(), state.drop().1.extract_proof()))
								}
								Ok(res) => Ok((res.output, state.drop().1.extract_proof())),
							};

							res.map(|(output, proof)| (output, proof.into_iter().map(|x| x.into_vec()).collect()))
						};

						match with_state.generate_proof(&call) {
							Ok(proof) => proof,
							Err(e) => {
								warn!(target: "client", "Failed to generate transition proof for block {}: {}", hash, e);
								warn!(target: "client", "Snapshots produced by this client may be incomplete");
								Vec::new()
							}
						}
					}
				};

				debug!(target: "client", "Block {} signals epoch end.", hash);

				let pending = PendingTransition { proof: proof };
				chain.insert_pending_transition(batch, hash, pending);
			},
			EpochChange::No => {},
			EpochChange::Unsure(_) => {
				warn!(target: "client", "Detected invalid engine implementation.");
				warn!(target: "client", "Engine claims to require more block data, but everything provided.");
			}
		}
	}

	// check for ending of epoch and write transition if it occurs.
	fn check_epoch_end<'a>(&self, header: &'a Header, chain: &BlockChain, client: &Client) {
		let is_epoch_end = self.engine.is_epoch_end(
			header,
			&(|hash| client.block_header_decoded(BlockId::Hash(hash))),
			&(|hash| chain.get_pending_transition(hash)), // TODO: limit to current epoch.
		);

		if let Some(proof) = is_epoch_end {
			debug!(target: "client", "Epoch transition at block {}", header.hash());

			let mut batch = DBTransaction::new();
			chain.insert_epoch_transition(&mut batch, header.number(), EpochTransition {
				block_hash: header.hash(),
				block_number: header.number(),
				proof: proof,
			});

			// always write the batch directly since epoch transition proofs are
			// fetched from a DB iterator and DB iterators are only available on
			// flushed data.
			client.db.read().key_value().write(batch).expect("DB flush failed");
		}
	}
}

impl Client {
	/// Create a new client with given parameters.
	/// The database is assumed to have been initialized with the correct columns.
	pub fn new(
		config: ClientConfig,
		spec: &Spec,
		db: Arc<BlockChainDB>,
		miner: Arc<Miner>,
		message_channel: IoChannel<ClientIoMessage>,
	) -> Result<Arc<Client>, ::error::Error> {
		let trie_spec = match config.fat_db {
			true => TrieSpec::Fat,
			false => TrieSpec::Secure,
		};

		let trie_factory = TrieFactory::new(trie_spec);
		let factories = Factories {
			vm: VmFactory::new(config.vm_type.clone(), config.jump_table_size),
			trie: trie_factory,
			accountdb: Default::default(),
		};

		let journal_db = journaldb::new(db.key_value().clone(), config.pruning, ::db::COL_STATE);
		let mut state_db = StateDB::new(journal_db, config.state_cache_size);
		if state_db.journal_db().is_empty() {
			// Sets the correct state root.
			state_db = spec.ensure_db_good(state_db, &factories)?;
			let mut batch = DBTransaction::new();
			state_db.journal_under(&mut batch, 0, &spec.genesis_header().hash())?;
			db.key_value().write(batch).map_err(ClientError::Database)?;
		}

		let gb = spec.genesis_block();
		let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone()));
		let tracedb = RwLock::new(TraceDB::new(config.tracing.clone(), db.clone(), chain.clone()));

		trace!("Cleanup journal: DB Earliest = {:?}, Latest = {:?}", state_db.journal_db().earliest_era(), state_db.journal_db().latest_era());

		let history = if config.history < MIN_HISTORY_SIZE {
			info!(target: "client", "Ignoring pruning history parameter of {}\
				, falling back to minimum of {}",
				config.history, MIN_HISTORY_SIZE);
			MIN_HISTORY_SIZE
		} else {
			config.history
		};

		if !chain.block_header_data(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(&h.state_root())) {
			warn!("State root not found for block #{} ({:x})", chain.best_block_number(), chain.best_block_hash());
		}

		let engine = spec.engine.clone();

		let awake = match config.mode { Mode::Dark(..) | Mode::Off => false, _ => true };

		let importer = Importer::new(&config, engine.clone(), message_channel.clone(), miner)?;

		let registrar_address = engine.additional_params().get("registrar").and_then(|s| Address::from_str(s).ok());
		if let Some(ref addr) = registrar_address {
			trace!(target: "client", "Found registrar at {}", addr);
		}

		let client = Arc::new(Client {
			enabled: AtomicBool::new(true),
			sleep_state: Mutex::new(SleepState::new(awake)),
			liveness: AtomicBool::new(awake),
			mode: Mutex::new(config.mode.clone()),
			chain: RwLock::new(chain),
			tracedb: tracedb,
			engine: engine,
			pruning: config.pruning.clone(),
			config: config,
			db: RwLock::new(db.clone()),
			state_db: RwLock::new(state_db),
			report: RwLock::new(Default::default()),
			io_channel: Mutex::new(message_channel),
			notify: RwLock::new(Vec::new()),
			queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
			queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
			queued_ancient_blocks: Default::default(),
			ancient_blocks_import_lock: Default::default(),
			queue_consensus_message: IoChannelQueue::new(usize::max_value()),
			last_hashes: RwLock::new(VecDeque::new()),
			factories: factories,
			history: history,
			on_user_defaults_change: Mutex::new(None),
			registrar: registry::Registry::default(),
			registrar_address,
			exit_handler: Mutex::new(None),
			importer,
		});

		// prune old states.
		{
			let state_db = client.state_db.read().boxed_clone();
			let chain = client.chain.read();
			client.prune_ancient(state_db, &chain)?;
		}

		// ensure genesis epoch proof in the DB.
		{
			let chain = client.chain.read();
			let gh = spec.genesis_header();
			if chain.epoch_transition(0, gh.hash()).is_none() {
				trace!(target: "client", "No genesis transition found.");

				let proof = client.with_proving_caller(
					BlockId::Number(0),
					|call| client.engine.genesis_epoch_data(&gh, call)
				);
				let proof = match proof {
					Ok(proof) => proof,
					Err(e) => {
						warn!(target: "client", "Error generating genesis epoch data: {}. Snapshots generated may not be complete.", e);
						Vec::new()
					}
				};

				debug!(target: "client", "Obtained genesis transition proof: {:?}", proof);

				let mut batch = DBTransaction::new();
				chain.insert_epoch_transition(&mut batch, 0, EpochTransition {
					block_hash: gh.hash(),
					block_number: 0,
					proof: proof,
				});

				client.db.read().key_value().write_buffered(batch);
			}
		}

		// ensure buffered changes are flushed.
		client.db.read().key_value().flush().map_err(ClientError::Database)?;
		Ok(client)
	}

	/// Wakes up client if it's a sleep.
	pub fn keep_alive(&self) {
		let should_wake = match *self.mode.lock() {
			Mode::Dark(..) | Mode::Passive(..) => true,
			_ => false,
		};
		if should_wake {
			self.wake_up();
			(*self.sleep_state.lock()).last_activity = Some(Instant::now());
		}
	}
	/// Adds an actor to be notified on certain events
	pub fn add_notify(&self, target: Arc<ChainNotify>) {
		self.notify.write().push(Arc::downgrade(&target));
	}
	/// Set a closure to call when the client wants to be restarted.
	///
	/// The parameter passed to the callback is the name of the new chain spec to use after
	/// the restart.
	pub fn set_exit_handler<F>(&self, f: F) where F: Fn(String) + 'static + Send {
		*self.exit_handler.lock() = Some(Box::new(f));
	}
	/// Returns engine reference.
	pub fn engine(&self) -> &EthEngine {
		&*self.engine
	}
	fn notify<F>(&self, f: F) where F: Fn(&ChainNotify) {
		for np in &*self.notify.read() {
			if let Some(n) = np.upgrade() {
				f(&*n);
	/// Register an action to be done if a mode/spec_name change happens.
	pub fn on_user_defaults_change<F>(&self, f: F) where F: 'static + FnMut(Option<Mode>) + Send {
		*self.on_user_defaults_change.lock() = Some(Box::new(f));
	}
	/// Flush the block import queue.
	pub fn flush_queue(&self) {
		self.importer.block_queue.flush();
		while !self.importer.block_queue.queue_info().is_empty() {
			self.import_verified_blocks();
		}
	}
	/// The env info as of the best block.
	pub fn latest_env_info(&self) -> EnvInfo {
		self.env_info(BlockId::Latest).expect("Best block header always stored; qed")
	}
	/// The env info as of a given block.
	/// returns `None` if the block unknown.
	pub fn env_info(&self, id: BlockId) -> Option<EnvInfo> {
		self.block_header(id).map(|header| {
			EnvInfo {
				number: header.number(),
				author: header.author(),
				timestamp: header.timestamp(),
				difficulty: header.difficulty(),
				last_hashes: self.build_last_hashes(&header.parent_hash()),
				gas_used: U256::default(),
				gas_limit: header.gas_limit(),
			}
		})
	}

	fn build_last_hashes(&self, parent_hash: &H256) -> Arc<LastHashes> {
		{
			let hashes = self.last_hashes.read();
			if hashes.front().map_or(false, |h| h == parent_hash) {
				let mut res = Vec::from(hashes.clone());
				res.resize(256, H256::default());
				return Arc::new(res);
			}
		}
		let mut last_hashes = LastHashes::new();
		last_hashes.resize(256, H256::default());
		last_hashes[0] = parent_hash.clone();
		let chain = self.chain.read();
		for i in 0..255 {
			match chain.block_details(&last_hashes[i]) {
				Some(details) => {
					last_hashes[i + 1] = details.parent.clone();
				},
				None => break,
			}
		let mut cached_hashes = self.last_hashes.write();
		*cached_hashes = VecDeque::from(last_hashes.clone());
		Arc::new(last_hashes)
	}

	/// This is triggered by a message coming from a block queue when the block is ready for insertion
	pub fn import_verified_blocks(&self) -> usize {
		self.importer.import_verified_blocks(self)
	}

	// use a state-proving closure for the given block.
	fn with_proving_caller<F, T>(&self, id: BlockId, with_call: F) -> T
		where F: FnOnce(&::machine::Call) -> T
	{
		let call = |a, d| {
			let tx = self.contract_call_tx(id, a, d);
			let (result, items) = self.prove_transaction(tx, id)
				.ok_or_else(|| format!("Unable to make call. State unavailable?"))?;

			let items = items.into_iter().map(|x| x.to_vec()).collect();
			Ok((result, items))
		};

		with_call(&call)
	}

	// prune ancient states until below the memory limit or only the minimum amount remain.
	fn prune_ancient(&self, mut state_db: StateDB, chain: &BlockChain) -> Result<(), ClientError> {
		let number = match state_db.journal_db().latest_era() {
			Some(n) => n,
			None => return Ok(()),
		};

		// prune all ancient eras until we're below the memory target,
		// but have at least the minimum number of states.
		loop {
			let needs_pruning = state_db.journal_db().is_pruned() &&
				state_db.journal_db().journal_size() >= self.config.history_mem;

			if !needs_pruning { break }
			match state_db.journal_db().earliest_era() {
				Some(era) if era + self.history <= number => {
					trace!(target: "client", "Pruning state for ancient era {}", era);
					match chain.block_hash(era) {
						Some(ancient_hash) => {
							let mut batch = DBTransaction::new();
							state_db.mark_canonical(&mut batch, era, &ancient_hash)?;
							self.db.read().key_value().write_buffered(batch);
							state_db.journal_db().flush();
						}
						None =>
							debug!(target: "client", "Missing expected hash for block {}", era),
					}
				}
				_ => break, // means that every era is kept, no pruning necessary.
			}
		}

		Ok(())
	}

	fn update_last_hashes(&self, parent: &H256, hash: &H256) {
		let mut hashes = self.last_hashes.write();
		if hashes.front().map_or(false, |h| h == parent) {
			if hashes.len() > 255 {
				hashes.pop_back();
			}
			hashes.push_front(hash.clone());
		}
	}

	/// Get shared miner reference.
	pub fn miner(&self) -> Arc<Miner> {
		self.importer.miner.clone()
	/// Replace io channel. Useful for testing.
	pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
		*self.io_channel.lock() = io_channel;