client.rs 79.8 KiB
Newer Older
	fn registrar_address(&self) -> Option<Address> {
Marek Kotewicz's avatar
Marek Kotewicz committed
		self.registrar_address.clone()
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
	fn eip86_transition(&self) -> u64 {
		self.engine().params().eip86_transition
	}
impl IoClient for Client {
	fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
		trace_time!("queue_transactions");
		let len = transactions.len();
		self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| {
			trace_time!("import_queued_transactions");

			let txs: Vec<UnverifiedTransaction> = transactions
				.iter()
				.filter_map(|bytes| client.engine.decode_transaction(bytes).ok())
				.collect();

			client.notify(|notify| {
				notify.transactions_received(&txs, peer_id);
			});

			client.importer.miner.import_external_transactions(client, txs);
		}).unwrap_or_else(|e| {
			debug!(target: "client", "Ignoring {} transactions: {}", len, e);
		});
	}

	fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError> {
		trace_time!("queue_ancient_block");
		let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?;
		let hash = header.hash();

		{
			// check block order
			if self.chain.read().is_known(&hash) {
				bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain));
			}
			let parent_hash = header.parent_hash();
			// NOTE To prevent race condition with import, make sure to check queued blocks first
			// (and attempt to acquire lock)
			let is_parent_pending = self.queued_ancient_blocks.read().0.contains(parent_hash);
			if !is_parent_pending {
				let status = self.block_status(BlockId::Hash(*parent_hash));
				if  status == BlockStatus::Unknown || status == BlockStatus::Pending {
					bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*parent_hash)));
				}
		// we queue blocks here and trigger an IO message.
		{
			let mut queued = self.queued_ancient_blocks.write();
			queued.0.insert(hash);
			queued.1.push_back((header, block_bytes, receipts_bytes));
		}

		let queued = self.queued_ancient_blocks.clone();
		let lock = self.ancient_blocks_import_lock.clone();
		match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| {
			trace_time!("import_ancient_block");
			// Make sure to hold the lock here to prevent importing out of order.
			// We use separate lock, cause we don't want to block queueing.
			let _lock = lock.lock();
			for _i in 0..MAX_ANCIENT_BLOCKS_TO_IMPORT {
				let first = queued.write().1.pop_front();
				if let Some((header, block_bytes, receipts_bytes)) = first {
					let hash = header.hash();
					let result = client.importer.import_old_block(
						&header,
						&block_bytes,
						&receipts_bytes,
						&**client.db.read().key_value(),
						&*client.chain.read(),
					);
					if let Err(e) = result {
						error!(target: "client", "Error importing ancient block: {}", e);
					// remove from pending
					queued.write().0.remove(&hash);
				} else {
					break;
				}
			}
		}) {
			Ok(_) => Ok(hash),
			Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))),
		}
	}

	fn queue_consensus_message(&self, message: Bytes) {
		match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
			if let Err(e) = client.engine().handle_message(&message) {
				debug!(target: "poa", "Invalid message received: {}", e);
			}
		}) {
			Ok(_) => (),
			Err(e) => {
				debug!(target: "poa", "Ignoring the message, error queueing: {}", e);
			}
		}
	}
}

impl ReopenBlock for Client {
	fn reopen_block(&self, block: ClosedBlock) -> OpenBlock {
		let engine = &*self.engine;
		let mut block = block.reopen(engine);
		let max_uncles = engine.maximum_uncle_count(block.header().number());
		if block.uncles().len() < max_uncles {
			let chain = self.chain.read();
			let h = chain.best_block_hash();
			// Add new uncles
			let uncles = chain
				.find_uncle_hashes(&h, engine.maximum_uncle_age())
				.unwrap_or_else(Vec::new);
			for h in uncles {
				if !block.uncles().iter().any(|header| header.hash() == h) {
					let uncle = chain.block_header_data(&h).expect("find_uncle_hashes only returns hashes for existing headers; qed");
David's avatar
David committed
					let uncle = uncle.decode().expect("decoding failure");
					block.push_uncle(uncle).expect("pushing up to maximum_uncle_count;
												push_uncle is not ok only if more than maximum_uncle_count is pushed;
												so all push_uncle are Ok;
												qed");
					if block.uncles().len() >= max_uncles { break }
				}
			}

		}
		block
impl PrepareOpenBlock for Client {
Gav Wood's avatar
Gav Wood committed
	fn prepare_open_block(&self, author: Address, gas_range_target: (U256, U256), extra_data: Bytes) -> OpenBlock {
		let engine = &*self.engine;
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		let chain = self.chain.read();
		let best_header = chain.best_block_header();
		let h = best_header.hash();
Nikolay Volf's avatar
Nikolay Volf committed

		let is_epoch_begin = chain.epoch_transition(best_header.number(), h).is_some();
Marek Kotewicz's avatar
Marek Kotewicz committed
		let mut open_block = OpenBlock::new(
Nikolay Volf's avatar
Nikolay Volf committed
			engine,
			self.tracedb.read().tracing_enabled(),
Tomasz Drwięga's avatar
Tomasz Drwięga committed
			self.state_db.read().boxed_clone_canon(&h),
			&best_header,
			self.build_last_hashes(&h),
Nikolay Volf's avatar
Nikolay Volf committed
			author,
Gav Wood's avatar
Gav Wood committed
			gas_range_target,
Nikolay Volf's avatar
Nikolay Volf committed
			extra_data,
			&mut chain.ancestry_with_metadata_iter(best_header.hash()),
Gav Wood's avatar
Gav Wood committed
		).expect("OpenBlock::new only fails if parent state root invalid; state root of best block's header is never invalid; qed");
Nikolay Volf's avatar
Nikolay Volf committed

		// Add uncles
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		chain
Nikolay Volf's avatar
Nikolay Volf committed
			.find_uncle_headers(&h, engine.maximum_uncle_age())
			.unwrap_or_else(Vec::new)
Nikolay Volf's avatar
Nikolay Volf committed
			.into_iter()
			.take(engine.maximum_uncle_count(open_block.header().number()))
Nikolay Volf's avatar
Nikolay Volf committed
			.foreach(|h| {
David's avatar
David committed
				open_block.push_uncle(h.decode().expect("decoding failure")).expect("pushing maximum_uncle_count;
												open_block was just created;
												push_uncle is not ok only if more than maximum_uncle_count is pushed;
												so all push_uncle are Ok;
												qed");
Nikolay Volf's avatar
Nikolay Volf committed
			});

Marek Kotewicz's avatar
Marek Kotewicz committed
		open_block
Nikolay Volf's avatar
Nikolay Volf committed
	}
impl BlockProducer for Client {}
impl ScheduleInfo for Client {
	fn latest_schedule(&self) -> Schedule {
		self.engine.schedule(self.latest_env_info().number)
impl ImportSealedBlock for Client {
	fn import_sealed_block(&self, block: SealedBlock) -> ImportResult {
		let h = block.header().hash();
		let start = Instant::now();
		let route = {
			// scope for self.import_lock
			let _import_lock = self.importer.import_lock.lock();
			trace_time!("import_sealed_block");

			let number = block.header().number();
			let block_data = block.rlp_bytes();
			let header = block.header().clone();

			let route = self.importer.commit_block(block, &header, &block_data, self);
			trace!(target: "client", "Imported sealed block #{} ({})", number, h);
Tomasz Drwięga's avatar
Tomasz Drwięga committed
			self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
		let route = ChainRoute::from([route].as_ref());
		self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), self.engine.seals_internally().is_some());
		self.notify(|notify| {
			notify.new_blocks(
				vec![h.clone()],
				vec![],
keorn's avatar
keorn committed
				vec![],
		self.db.read().key_value().flush().expect("DB flush failed.");
impl BroadcastProposalBlock for Client {
	fn broadcast_proposal_block(&self, block: SealedBlock) {
		const DURATION_ZERO: Duration = Duration::from_millis(0);
		self.notify(|notify| {
			notify.new_blocks(
				vec![],
				vec![],
				ChainRoute::default(),
				vec![],
				vec![block.rlp_bytes()],
			);
		});
	}
}

impl SealedBlockImporter for Client {}

impl ::miner::TransactionVerifierClient for Client {}
impl ::miner::BlockChainClient for Client {}
impl super::traits::EngineClient for Client {
	fn update_sealing(&self) {
		self.importer.miner.update_sealing(self)
	}

	fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>) {
		let import = self.importer.miner.submit_seal(block_hash, seal).and_then(|block| self.import_sealed_block(block));
		if let Err(err) = import {
			warn!(target: "poa", "Wrong internal seal submission! {:?}", err);
		}
	}

	fn broadcast_consensus_message(&self, message: Bytes) {
		self.notify(|notify| notify.broadcast(ChainMessageType::Consensus(message.clone())));

	fn epoch_transition_for(&self, parent_hash: H256) -> Option<::engines::EpochTransition> {
		self.chain.read().epoch_transition_for(parent_hash)
	}
	fn as_full_client(&self) -> Option<&BlockChainClient> { Some(self) }

	fn block_number(&self, id: BlockId) -> Option<BlockNumber> {
		BlockChainClient::block_number(self, id)
	}

	fn block_header(&self, id: BlockId) -> Option<::encoded::Header> {
		BlockChainClient::block_header(self, id)
	}
impl ProvingBlockChainClient for Client {
	fn prove_storage(&self, key1: H256, key2: H256, id: BlockId) -> Option<(Vec<Bytes>, H256)> {
		self.state_at(id)
			.and_then(move |state| state.prove_storage(key1, key2).ok())
	fn prove_account(&self, key1: H256, id: BlockId) -> Option<(Vec<Bytes>, ::types::basic_account::BasicAccount)> {
		self.state_at(id)
			.and_then(move |state| state.prove_account(key1).ok())
	fn prove_transaction(&self, transaction: SignedTransaction, id: BlockId) -> Option<(Bytes, Vec<DBValue>)> {
		let (header, mut env_info) = match (self.block_header(id), self.env_info(id)) {
			(Some(s), Some(e)) => (s, e),
			_ => return None,
		};

		env_info.gas_limit = transaction.gas.clone();
Tomasz Drwięga's avatar
Tomasz Drwięga committed
		let mut jdb = self.state_db.read().journal_db().boxed_clone();
		state::prove_transaction(
			jdb.as_hashdb_mut(),
			header.state_root().clone(),
			&transaction,
			self.engine.machine(),
			&env_info,
			self.factories.clone(),
			false,
		)
	}

	fn epoch_signal(&self, hash: H256) -> Option<Vec<u8>> {
		// pending transitions are never deleted, and do not contain
		// finality proofs by definition.
		self.chain.read().get_pending_transition(hash).map(|pending| pending.proof)
keorn's avatar
keorn committed
impl Drop for Client {
	fn drop(&mut self) {
		self.engine.stop();
	}
}

/// Returns `LocalizedReceipt` given `LocalizedTransaction`
/// and a vector of receipts from given block up to transaction index.
fn transaction_receipt(machine: &::machine::EthereumMachine, mut tx: LocalizedTransaction, mut receipts: Vec<Receipt>) -> LocalizedReceipt {
	assert_eq!(receipts.len(), tx.transaction_index + 1, "All previous receipts are provided.");

	let sender = tx.sender();
	let receipt = receipts.pop().expect("Current receipt is provided; qed");
	let prior_gas_used = match tx.transaction_index {
		0 => 0.into(),
		i => receipts.get(i - 1).expect("All previous receipts are provided; qed").gas_used,
	};
	let no_of_logs = receipts.into_iter().map(|receipt| receipt.logs.len()).sum::<usize>();
	let transaction_hash = tx.hash();
	let block_hash = tx.block_hash;
	let block_number = tx.block_number;
	let transaction_index = tx.transaction_index;

	LocalizedReceipt {
		from: sender,
		to: match tx.action {
				Action::Create => None,
				Action::Call(ref address) => Some(address.clone().into())
		},
		transaction_hash: transaction_hash,
		transaction_index: transaction_index,
		block_hash: block_hash,
Arkadiy Paronyan's avatar
Arkadiy Paronyan committed
		block_number: block_number,
		cumulative_gas_used: receipt.gas_used,
		gas_used: receipt.gas_used - prior_gas_used,
		contract_address: match tx.action {
			Action::Call(_) => None,
			Action::Create => Some(contract_address(machine.create_address_scheme(block_number), &sender, &tx.nonce, &tx.data).0)
		},
		logs: receipt.logs.into_iter().enumerate().map(|(i, log)| LocalizedLogEntry {
			entry: log,
			block_hash: block_hash,
			block_number: block_number,
			transaction_hash: transaction_hash,
			transaction_index: transaction_index,
			transaction_log_index: i,
			log_index: no_of_logs + i,
		}).collect(),
		log_bloom: receipt.log_bloom,
		outcome: receipt.outcome,
#[cfg(test)]
mod tests {

	#[test]
	fn should_not_cache_details_before_commit() {
		use client::{BlockChainClient, ChainInfo};
		use test_helpers::{generate_dummy_client, get_good_dummy_block_hash};

		use std::thread;
		use std::time::Duration;
		use std::sync::Arc;
		use std::sync::atomic::{AtomicBool, Ordering};
		use kvdb::DBTransaction;
		use blockchain::ExtrasInsert;

		let client = generate_dummy_client(0);
		let genesis = client.chain_info().best_block_hash;
		let (new_hash, new_block) = get_good_dummy_block_hash();

		let go = {
			// Separate thread uncommited transaction
			let go = Arc::new(AtomicBool::new(false));
			let go_thread = go.clone();
			let another_client = client.clone();
			thread::spawn(move || {
				let mut batch = DBTransaction::new();
				another_client.chain.read().insert_block(&mut batch, &new_block, Vec::new(), ExtrasInsert {
					fork_choice: ::engines::ForkChoice::New,
					is_finalized: false,
					metadata: None,
				});
				go_thread.store(true, Ordering::SeqCst);
			});
			go
		};

		while !go.load(Ordering::SeqCst) { thread::park_timeout(Duration::from_millis(5)); }

		assert!(client.tree_route(&genesis, &new_hash).is_none());
	}

	#[test]
	fn should_return_correct_log_index() {
		use hash::keccak;
		use super::transaction_receipt;
		use ethkey::KeyPair;
		use log_entry::{LogEntry, LocalizedLogEntry};
		use receipt::{Receipt, LocalizedReceipt, TransactionOutcome};
		use transaction::{Transaction, LocalizedTransaction, Action};

		// given
		let key = KeyPair::from_secret_slice(&keccak("test")).unwrap();
		let secret = key.secret();
		let machine = ::ethereum::new_frontier_test_machine();

		let block_number = 1;
		let block_hash = 5.into();
		let state_root = 99.into();
		let gas_used = 10.into();
		let raw_tx = Transaction {
			nonce: 0.into(),
			gas_price: 0.into(),
			gas: 21000.into(),
			action: Action::Call(10.into()),
			value: 0.into(),
			data: vec![],
		};
		let tx1 = raw_tx.clone().sign(secret, None);
		let transaction = LocalizedTransaction {
			signed: tx1.clone().into(),
			block_number: block_number,
			block_hash: block_hash,
			transaction_index: 1,
			cached_sender: Some(tx1.sender()),
		};
		let logs = vec![LogEntry {
			address: 5.into(),
			topics: vec![],
			data: vec![],
		}, LogEntry {
			address: 15.into(),
			topics: vec![],
			data: vec![],
		}];
		let receipts = vec![Receipt {
			outcome: TransactionOutcome::StateRoot(state_root),
			gas_used: 5.into(),
			log_bloom: Default::default(),
			logs: vec![logs[0].clone()],
		}, Receipt {
			outcome: TransactionOutcome::StateRoot(state_root),
			gas_used: gas_used,
			log_bloom: Default::default(),
			logs: logs.clone(),
		}];

		// when
		let receipt = transaction_receipt(&machine, transaction, receipts);

		// then
		assert_eq!(receipt, LocalizedReceipt {
			from: tx1.sender().into(),
			to: match tx1.action {
				Action::Create => None,
				Action::Call(ref address) => Some(address.clone().into())
			},
			transaction_hash: tx1.hash(),
			transaction_index: 1,
			block_hash: block_hash,
			block_number: block_number,
			cumulative_gas_used: gas_used,
			gas_used: gas_used - 5.into(),
			contract_address: None,
			logs: vec![LocalizedLogEntry {
				entry: logs[0].clone(),
				block_hash: block_hash,
				block_number: block_number,
				transaction_hash: tx1.hash(),
				transaction_index: 1,
				transaction_log_index: 0,
				log_index: 1,
			}, LocalizedLogEntry {
				entry: logs[1].clone(),
				block_hash: block_hash,
				block_number: block_number,
				transaction_hash: tx1.hash(),
				transaction_index: 1,
				transaction_log_index: 1,
				log_index: 2,
			}],
			log_bloom: Default::default(),
			outcome: TransactionOutcome::StateRoot(state_root),

#[derive(Debug)]
enum QueueError {
	Channel(IoError),
	Full(usize),
}

impl fmt::Display for QueueError {
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
		match *self {
			QueueError::Channel(ref c) => fmt::Display::fmt(c, fmt),
			QueueError::Full(limit) => write!(fmt, "The queue is full ({})", limit),
		}
	}
}

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
	currently_queued: Arc<AtomicUsize>,
	limit: usize,
}

impl IoChannelQueue {
	pub fn new(limit: usize) -> Self {
		IoChannelQueue {
			currently_queued: Default::default(),
	pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
		F: Fn(&Client) + Send + Sync + 'static,
		let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
		ensure!(queue_size < self.limit, QueueError::Full(self.limit));
		let currently_queued = self.currently_queued.clone();
		let result = channel.send(ClientIoMessage::execute(move |client| {
			currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
			fun(client);
			Ok(_) => {
				self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
				Ok(())
			},
			Err(e) => Err(QueueError::Channel(e)),
		}
	}
}