75.8 KiB
Newer Older
				error!(target: "client", "Error importing ancient block: {}", e);
		}) {
			Ok(_) => Ok(hash),
			Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))),

	fn queue_consensus_message(&self, message: Bytes) {
		match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
			if let Err(e) = client.engine().handle_message(&message) {
				debug!(target: "poa", "Invalid message received: {}", e);
		}) {
			Ok(_) => (),
			Err(e) => {
				debug!(target: "poa", "Ignoring the message, error queueing: {}", e);

impl ReopenBlock for Client {
	fn reopen_block(&self, block: ClosedBlock) -> OpenBlock {
		let engine = &*self.engine;
		let mut block = block.reopen(engine);
		let max_uncles = engine.maximum_uncle_count(block.header().number());
		if block.uncles().len() < max_uncles {
			let chain =;
			let h = chain.best_block_hash();
			// Add new uncles
			let uncles = chain
				.find_uncle_hashes(&h, engine.maximum_uncle_age())
			for h in uncles {
				if !block.uncles().iter().any(|header| header.hash() == h) {
					let uncle = chain.block_header_data(&h).expect("find_uncle_hashes only returns hashes for existing headers; qed");
David's avatar
David committed
					let uncle = uncle.decode().expect("decoding failure");
					block.push_uncle(uncle).expect("pushing up to maximum_uncle_count;
												push_uncle is not ok only if more than maximum_uncle_count is pushed;
												so all push_uncle are Ok;
					if block.uncles().len() >= max_uncles { break }

impl PrepareOpenBlock for Client {
Gav Wood's avatar
Gav Wood committed
	fn prepare_open_block(&self, author: Address, gas_range_target: (U256, U256), extra_data: Bytes) -> OpenBlock {
		let engine = &*self.engine;
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		let chain =;
		let best_header = chain.best_block_header();
		let h = best_header.hash();
Nikolay Volf's avatar
Nikolay Volf committed

		let is_epoch_begin = chain.epoch_transition(best_header.number(), h).is_some();
Marek Kotewicz's avatar
Marek Kotewicz committed
		let mut open_block = OpenBlock::new(
Nikolay Volf's avatar
Nikolay Volf committed
Tomasz Drwięga's avatar
Tomasz Drwięga committed,
Nikolay Volf's avatar
Nikolay Volf committed
Gav Wood's avatar
Gav Wood committed
Nikolay Volf's avatar
Nikolay Volf committed
Gav Wood's avatar
Gav Wood committed
		).expect("OpenBlock::new only fails if parent state root invalid; state root of best block's header is never invalid; qed");
Nikolay Volf's avatar
Nikolay Volf committed

		// Add uncles
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
Nikolay Volf's avatar
Nikolay Volf committed
			.find_uncle_headers(&h, engine.maximum_uncle_age())
Nikolay Volf's avatar
Nikolay Volf committed
Nikolay Volf's avatar
Nikolay Volf committed
			.foreach(|h| {
David's avatar
David committed
				open_block.push_uncle(h.decode().expect("decoding failure")).expect("pushing maximum_uncle_count;
												open_block was just created;
												push_uncle is not ok only if more than maximum_uncle_count is pushed;
												so all push_uncle are Ok;
Nikolay Volf's avatar
Nikolay Volf committed

Marek Kotewicz's avatar
Marek Kotewicz committed
Nikolay Volf's avatar
Nikolay Volf committed
impl BlockProducer for Client {}
impl ScheduleInfo for Client {
	fn latest_schedule(&self) -> Schedule {
impl ImportSealedBlock for Client {
	fn import_sealed_block(&self, block: SealedBlock) -> ImportResult {
		let h = block.header().hash();
		let start = Instant::now();
		let route = {
			// scope for self.import_lock
			let _import_lock = self.importer.import_lock.lock();

			let number = block.header().number();
			let block_data = block.rlp_bytes();
			let header = block.header().clone();

			let route = self.importer.commit_block(block, &header, &block_data, self);
			trace!(target: "client", "Imported sealed block #{} ({})", number, h);
Tomasz Drwięga's avatar
Tomasz Drwięga committed
			self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
		let route = ChainRoute::from([route].as_ref());
		self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), true);
		self.notify(|notify| {
keorn's avatar
keorn committed
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed"DB flush failed.");
impl BroadcastProposalBlock for Client {
	fn broadcast_proposal_block(&self, block: SealedBlock) {
		const DURATION_ZERO: Duration = Duration::from_millis(0);
		self.notify(|notify| {

impl SealedBlockImporter for Client {}

impl ::miner::TransactionVerifierClient for Client {}
impl ::miner::BlockChainClient for Client {}
impl super::traits::EngineClient for Client {
	fn update_sealing(&self) {

	fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>) {
		let import = self.importer.miner.submit_seal(block_hash, seal).and_then(|block| self.import_sealed_block(block));
		if let Err(err) = import {
			warn!(target: "poa", "Wrong internal seal submission! {:?}", err);

	fn broadcast_consensus_message(&self, message: Bytes) {
		self.notify(|notify| notify.broadcast(ChainMessageType::Consensus(message.clone())));

	fn epoch_transition_for(&self, parent_hash: H256) -> Option<::engines::EpochTransition> {
	fn as_full_client(&self) -> Option<&BlockChainClient> { Some(self) }

	fn block_number(&self, id: BlockId) -> Option<BlockNumber> {
		BlockChainClient::block_number(self, id)

	fn block_header(&self, id: BlockId) -> Option<::encoded::Header> {
		BlockChainClient::block_header(self, id)
impl ProvingBlockChainClient for Client {
	fn prove_storage(&self, key1: H256, key2: H256, id: BlockId) -> Option<(Vec<Bytes>, H256)> {
			.and_then(move |state| state.prove_storage(key1, key2).ok())
	fn prove_account(&self, key1: H256, id: BlockId) -> Option<(Vec<Bytes>, ::types::basic_account::BasicAccount)> {
			.and_then(move |state| state.prove_account(key1).ok())
	fn prove_transaction(&self, transaction: SignedTransaction, id: BlockId) -> Option<(Bytes, Vec<DBValue>)> {
		let (header, mut env_info) = match (self.block_header(id), self.env_info(id)) {
			(Some(s), Some(e)) => (s, e),
			_ => return None,

		env_info.gas_limit = transaction.gas.clone();
Tomasz Drwięga's avatar
Tomasz Drwięga committed
		let mut jdb =;

	fn epoch_signal(&self, hash: H256) -> Option<Vec<u8>> {
		// pending transitions are never deleted, and do not contain
		// finality proofs by definition.|pending| pending.proof)
keorn's avatar
keorn committed
impl Drop for Client {
	fn drop(&mut self) {

/// Returns `LocalizedReceipt` given `LocalizedTransaction`
/// and a vector of receipts from given block up to transaction index.
fn transaction_receipt(machine: &::machine::EthereumMachine, mut tx: LocalizedTransaction, mut receipts: Vec<Receipt>) -> LocalizedReceipt {
	assert_eq!(receipts.len(), tx.transaction_index + 1, "All previous receipts are provided.");

	let sender = tx.sender();
	let receipt = receipts.pop().expect("Current receipt is provided; qed");
	let prior_gas_used = match tx.transaction_index {
		0 => 0.into(),
		i => receipts.get(i - 1).expect("All previous receipts are provided; qed").gas_used,
	let no_of_logs = receipts.into_iter().map(|receipt| receipt.logs.len()).sum::<usize>();
	let transaction_hash = tx.hash();
	let block_hash = tx.block_hash;
	let block_number = tx.block_number;
	let transaction_index = tx.transaction_index;

	LocalizedReceipt {
		transaction_hash: transaction_hash,
		transaction_index: transaction_index,
		block_hash: block_hash,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		block_number: block_number,
		cumulative_gas_used: receipt.gas_used,
		gas_used: receipt.gas_used - prior_gas_used,
		contract_address: match tx.action {
			Action::Call(_) => None,
			Action::Create => Some(contract_address(machine.create_address_scheme(block_number), &sender, &tx.nonce, &
		logs: receipt.logs.into_iter().enumerate().map(|(i, log)| LocalizedLogEntry {
			entry: log,
			block_hash: block_hash,
			block_number: block_number,
			transaction_hash: transaction_hash,
			transaction_index: transaction_index,
			transaction_log_index: i,
			log_index: no_of_logs + i,
		log_bloom: receipt.log_bloom,
		outcome: receipt.outcome,
mod tests {

	fn should_not_cache_details_before_commit() {
		use client::{BlockChainClient, ChainInfo};
		use test_helpers::{generate_dummy_client, get_good_dummy_block_hash};

		use std::thread;
		use std::time::Duration;
		use std::sync::Arc;
		use std::sync::atomic::{AtomicBool, Ordering};
		use kvdb::DBTransaction;

		let client = generate_dummy_client(0);
		let genesis = client.chain_info().best_block_hash;
		let (new_hash, new_block) = get_good_dummy_block_hash();

		let go = {
			// Separate thread uncommited transaction
			let go = Arc::new(AtomicBool::new(false));
			let go_thread = go.clone();
			let another_client = client.clone();
			thread::spawn(move || {
				let mut batch = DBTransaction::new(); batch, &new_block, Vec::new());, Ordering::SeqCst);

		while !go.load(Ordering::SeqCst) { thread::park_timeout(Duration::from_millis(5)); }

		assert!(client.tree_route(&genesis, &new_hash).is_none());

	fn should_return_correct_log_index() {
		use hash::keccak;
		use super::transaction_receipt;
		use ethkey::KeyPair;
		use log_entry::{LogEntry, LocalizedLogEntry};
		use receipt::{Receipt, LocalizedReceipt, TransactionOutcome};
		use transaction::{Transaction, LocalizedTransaction, Action};

		// given
		let key = KeyPair::from_secret_slice(&keccak("test")).unwrap();
		let secret = key.secret();
		let machine = ::ethereum::new_frontier_test_machine();

		let block_number = 1;
		let block_hash = 5.into();
		let state_root = 99.into();
		let gas_used = 10.into();
		let raw_tx = Transaction {
			nonce: 0.into(),
			gas_price: 0.into(),
			gas: 21000.into(),
			action: Action::Call(10.into()),
			value: 0.into(),
			data: vec![],
		let tx1 = raw_tx.clone().sign(secret, None);
		let transaction = LocalizedTransaction {
			signed: tx1.clone().into(),
			block_number: block_number,
			block_hash: block_hash,
			transaction_index: 1,
			cached_sender: Some(tx1.sender()),
		let logs = vec![LogEntry {
			address: 5.into(),
			topics: vec![],
			data: vec![],
		}, LogEntry {
			address: 15.into(),
			topics: vec![],
			data: vec![],
		let receipts = vec![Receipt {
			outcome: TransactionOutcome::StateRoot(state_root),
			gas_used: 5.into(),
			log_bloom: Default::default(),
			logs: vec![logs[0].clone()],
		}, Receipt {
			outcome: TransactionOutcome::StateRoot(state_root),
			gas_used: gas_used,
			log_bloom: Default::default(),
			logs: logs.clone(),

		// when
		let receipt = transaction_receipt(&machine, transaction, receipts);

		// then
		assert_eq!(receipt, LocalizedReceipt {
			transaction_hash: tx1.hash(),
			transaction_index: 1,
			block_hash: block_hash,
			block_number: block_number,
			cumulative_gas_used: gas_used,
			gas_used: gas_used - 5.into(),
			contract_address: None,
			logs: vec![LocalizedLogEntry {
				entry: logs[0].clone(),
				block_hash: block_hash,
				block_number: block_number,
				transaction_hash: tx1.hash(),
				transaction_index: 1,
				transaction_log_index: 0,
				log_index: 1,
			}, LocalizedLogEntry {
				entry: logs[1].clone(),
				block_hash: block_hash,
				block_number: block_number,
				transaction_hash: tx1.hash(),
				transaction_index: 1,
				transaction_log_index: 1,
				log_index: 2,
			log_bloom: Default::default(),
			outcome: TransactionOutcome::StateRoot(state_root),

enum QueueError {

impl fmt::Display for QueueError {
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			QueueError::Channel(ref c) => fmt::Display::fmt(c, fmt),
			QueueError::Full(limit) => write!(fmt, "The queue is full ({})", limit),

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
	currently_queued: Arc<AtomicUsize>,
	limit: usize,

impl IoChannelQueue {
	pub fn new(limit: usize) -> Self {
		IoChannelQueue {
			currently_queued: Default::default(),

	pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
		F: Fn(&Client) + Send + Sync + 'static,
		let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
		ensure!(queue_size < self.limit, QueueError::Full(self.limit));

		let currently_queued = self.currently_queued.clone();
		let result = channel.send(ClientIoMessage::execute(move |client| {
			currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);

		match result {
			Ok(_) => {
				self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
			Err(e) => Err(QueueError::Channel(e)),