Newer
Older
fn registrar_address(&self) -> Option<Address> {
fn eip86_transition(&self) -> u64 {
self.engine().params().eip86_transition
}
Tomusdrw
committed
impl IoClient for Client {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
trace_time!("queue_transactions");
let len = transactions.len();
self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| {
trace_time!("import_queued_transactions");
let txs: Vec<UnverifiedTransaction> = transactions
.iter()
.filter_map(|bytes| client.engine.decode_transaction(bytes).ok())
.collect();
client.notify(|notify| {
notify.transactions_received(&txs, peer_id);
});
client.importer.miner.import_external_transactions(client, txs);
}).unwrap_or_else(|e| {
debug!(target: "client", "Ignoring {} transactions: {}", len, e);
});
}
fn queue_ancient_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, BlockImportError> {
trace_time!("queue_ancient_block");
let header: Header = ::rlp::Rlp::new(&block_bytes).val_at(0)?;
let hash = header.hash();
{
// check block order
if self.chain.read().is_known(&hash) {
bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain));
}
let parent_hash = header.parent_hash();
// NOTE To prevent race condition with import, make sure to check queued blocks first
// (and attempt to acquire lock)
let is_parent_pending = self.queued_ancient_blocks.read().0.contains(parent_hash);
if !is_parent_pending {
let status = self.block_status(BlockId::Hash(*parent_hash));
if status == BlockStatus::Unknown || status == BlockStatus::Pending {
bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*parent_hash)));
}
// we queue blocks here and trigger an IO message.
{
let mut queued = self.queued_ancient_blocks.write();
queued.0.insert(hash);
queued.1.push_back((header, block_bytes, receipts_bytes));
}
let queued = self.queued_ancient_blocks.clone();
let lock = self.ancient_blocks_import_lock.clone();
match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| {
trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing.
let _lock = lock.lock();
for _i in 0..MAX_ANCIENT_BLOCKS_TO_IMPORT {
let first = queued.write().1.pop_front();
if let Some((header, block_bytes, receipts_bytes)) = first {
let hash = header.hash();
let result = client.importer.import_old_block(
&header,
&block_bytes,
&receipts_bytes,
&*client.chain.read(),
);
if let Err(e) = result {
error!(target: "client", "Error importing ancient block: {}", e);
// remove from pending
queued.write().0.remove(&hash);
} else {
break;
}
}
}) {
Ok(_) => Ok(hash),
Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))),
}
}
fn queue_consensus_message(&self, message: Bytes) {
match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| {
if let Err(e) = client.engine().handle_message(&message) {
debug!(target: "poa", "Invalid message received: {}", e);
}
}) {
Ok(_) => (),
Err(e) => {
debug!(target: "poa", "Ignoring the message, error queueing: {}", e);
}
}
}
}
impl ReopenBlock for Client {
fn reopen_block(&self, block: ClosedBlock) -> OpenBlock {
let engine = &*self.engine;
let mut block = block.reopen(engine);
let max_uncles = engine.maximum_uncle_count(block.header().number());
if block.uncles().len() < max_uncles {
let chain = self.chain.read();
let h = chain.best_block_hash();
// Add new uncles
let uncles = chain
.find_uncle_hashes(&h, engine.maximum_uncle_age())
.unwrap_or_else(Vec::new);
Vlad Lupashevskyi
committed
for h in uncles {
if !block.uncles().iter().any(|header| header.hash() == h) {
let uncle = chain.block_header_data(&h).expect("find_uncle_hashes only returns hashes for existing headers; qed");
let uncle = uncle.decode().expect("decoding failure");
block.push_uncle(uncle).expect("pushing up to maximum_uncle_count;
push_uncle is not ok only if more than maximum_uncle_count is pushed;
so all push_uncle are Ok;
qed");
if block.uncles().len() >= max_uncles { break }
}
}
}
block
fn prepare_open_block(&self, author: Address, gas_range_target: (U256, U256), extra_data: Bytes) -> OpenBlock {
let engine = &*self.engine;
let best_header = chain.best_block_header();
let h = best_header.hash();
let is_epoch_begin = chain.epoch_transition(best_header.number(), h).is_some();
self.factories.clone(),
self.tracedb.read().tracing_enabled(),
is_epoch_begin,
&mut chain.ancestry_with_metadata_iter(best_header.hash()),
).expect("OpenBlock::new only fails if parent state root invalid; state root of best block's header is never invalid; qed");
.take(engine.maximum_uncle_count(open_block.header().number()))
open_block.push_uncle(h.decode().expect("decoding failure")).expect("pushing maximum_uncle_count;
open_block was just created;
push_uncle is not ok only if more than maximum_uncle_count is pushed;
so all push_uncle are Ok;
qed");
impl ScheduleInfo for Client {
fn latest_schedule(&self) -> Schedule {
self.engine.schedule(self.latest_env_info().number)
fn import_sealed_block(&self, block: SealedBlock) -> ImportResult {
let h = block.header().hash();
let start = Instant::now();
let route = {
// scope for self.import_lock
let _import_lock = self.importer.import_lock.lock();
trace_time!("import_sealed_block");
let number = block.header().number();
let block_data = block.rlp_bytes();
let route = self.importer.commit_block(block, &header, &block_data, self);
trace!(target: "client", "Imported sealed block #{} ({})", number, h);
self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), self.engine.seals_internally().is_some());
self.notify(|notify| {
notify.new_blocks(
vec![h.clone()],
vec![],
vec![h.clone()],
);
});
self.db.read().key_value().flush().expect("DB flush failed.");
Tomusdrw
committed
impl BroadcastProposalBlock for Client {
fn broadcast_proposal_block(&self, block: SealedBlock) {
const DURATION_ZERO: Duration = Duration::from_millis(0);
self.notify(|notify| {
notify.new_blocks(
vec![],
vec![],
);
});
}
}
impl SealedBlockImporter for Client {}
impl ::miner::TransactionVerifierClient for Client {}
impl ::miner::BlockChainClient for Client {}
impl super::traits::EngineClient for Client {
self.importer.miner.update_sealing(self)
}
fn submit_seal(&self, block_hash: H256, seal: Vec<Bytes>) {
let import = self.importer.miner.submit_seal(block_hash, seal).and_then(|block| self.import_sealed_block(block));
if let Err(err) = import {
warn!(target: "poa", "Wrong internal seal submission! {:?}", err);
}
}
fn broadcast_consensus_message(&self, message: Bytes) {
self.notify(|notify| notify.broadcast(ChainMessageType::Consensus(message.clone())));
fn epoch_transition_for(&self, parent_hash: H256) -> Option<::engines::EpochTransition> {
self.chain.read().epoch_transition_for(parent_hash)
}
fn as_full_client(&self) -> Option<&BlockChainClient> { Some(self) }
fn block_number(&self, id: BlockId) -> Option<BlockNumber> {
BlockChainClient::block_number(self, id)
}
fn block_header(&self, id: BlockId) -> Option<::encoded::Header> {
BlockChainClient::block_header(self, id)
}
impl ProvingBlockChainClient for Client {
fn prove_storage(&self, key1: H256, key2: H256, id: BlockId) -> Option<(Vec<Bytes>, H256)> {
.and_then(move |state| state.prove_storage(key1, key2).ok())
fn prove_account(&self, key1: H256, id: BlockId) -> Option<(Vec<Bytes>, ::types::basic_account::BasicAccount)> {
.and_then(move |state| state.prove_account(key1).ok())
fn prove_transaction(&self, transaction: SignedTransaction, id: BlockId) -> Option<(Bytes, Vec<DBValue>)> {
let (header, mut env_info) = match (self.block_header(id), self.env_info(id)) {
(Some(s), Some(e)) => (s, e),
_ => return None,
};
env_info.gas_limit = transaction.gas.clone();
let mut jdb = self.state_db.read().journal_db().boxed_clone();
state::prove_transaction(
jdb.as_hashdb_mut(),
header.state_root().clone(),
&transaction,
&env_info,
self.factories.clone(),
false,
)
}
fn epoch_signal(&self, hash: H256) -> Option<Vec<u8>> {
// pending transitions are never deleted, and do not contain
// finality proofs by definition.
self.chain.read().get_pending_transition(hash).map(|pending| pending.proof)
impl Drop for Client {
fn drop(&mut self) {
self.engine.stop();
}
}
/// Returns `LocalizedReceipt` given `LocalizedTransaction`
/// and a vector of receipts from given block up to transaction index.
fn transaction_receipt(machine: &::machine::EthereumMachine, mut tx: LocalizedTransaction, mut receipts: Vec<Receipt>) -> LocalizedReceipt {
assert_eq!(receipts.len(), tx.transaction_index + 1, "All previous receipts are provided.");
let receipt = receipts.pop().expect("Current receipt is provided; qed");
let prior_gas_used = match tx.transaction_index {
0 => 0.into(),
i => receipts.get(i - 1).expect("All previous receipts are provided; qed").gas_used,
};
let no_of_logs = receipts.into_iter().map(|receipt| receipt.logs.len()).sum::<usize>();
let transaction_hash = tx.hash();
let block_hash = tx.block_hash;
let block_number = tx.block_number;
let transaction_index = tx.transaction_index;
LocalizedReceipt {
from: sender,
to: match tx.action {
Action::Create => None,
Action::Call(ref address) => Some(address.clone().into())
},
transaction_hash: transaction_hash,
transaction_index: transaction_index,
block_hash: block_hash,
cumulative_gas_used: receipt.gas_used,
gas_used: receipt.gas_used - prior_gas_used,
contract_address: match tx.action {
Action::Call(_) => None,
Action::Create => Some(contract_address(machine.create_address_scheme(block_number), &sender, &tx.nonce, &tx.data).0)
},
logs: receipt.logs.into_iter().enumerate().map(|(i, log)| LocalizedLogEntry {
entry: log,
block_hash: block_hash,
block_number: block_number,
transaction_hash: transaction_hash,
transaction_index: transaction_index,
transaction_log_index: i,
log_index: no_of_logs + i,
}).collect(),
log_bloom: receipt.log_bloom,
outcome: receipt.outcome,
#[cfg(test)]
mod tests {
#[test]
fn should_not_cache_details_before_commit() {
use client::{BlockChainClient, ChainInfo};
use test_helpers::{generate_dummy_client, get_good_dummy_block_hash};
use std::thread;
use std::time::Duration;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use blockchain::ExtrasInsert;
let client = generate_dummy_client(0);
let genesis = client.chain_info().best_block_hash;
let (new_hash, new_block) = get_good_dummy_block_hash();
let go = {
// Separate thread uncommited transaction
let go = Arc::new(AtomicBool::new(false));
let go_thread = go.clone();
let another_client = client.clone();
let mut batch = DBTransaction::new();
another_client.chain.read().insert_block(&mut batch, &new_block, Vec::new(), ExtrasInsert {
fork_choice: ::engines::ForkChoice::New,
is_finalized: false,
metadata: None,
});
go_thread.store(true, Ordering::SeqCst);
});
go
};
while !go.load(Ordering::SeqCst) { thread::park_timeout(Duration::from_millis(5)); }
assert!(client.tree_route(&genesis, &new_hash).is_none());
}
#[test]
fn should_return_correct_log_index() {
use super::transaction_receipt;
use ethkey::KeyPair;
use log_entry::{LogEntry, LocalizedLogEntry};
use receipt::{Receipt, LocalizedReceipt, TransactionOutcome};
use transaction::{Transaction, LocalizedTransaction, Action};
// given
let key = KeyPair::from_secret_slice(&keccak("test")).unwrap();
let secret = key.secret();
let machine = ::ethereum::new_frontier_test_machine();
let block_number = 1;
let block_hash = 5.into();
let state_root = 99.into();
let gas_used = 10.into();
let raw_tx = Transaction {
nonce: 0.into(),
gas_price: 0.into(),
gas: 21000.into(),
action: Action::Call(10.into()),
value: 0.into(),
data: vec![],
};
let tx1 = raw_tx.clone().sign(secret, None);
let transaction = LocalizedTransaction {
block_number: block_number,
block_hash: block_hash,
transaction_index: 1,
};
let logs = vec![LogEntry {
address: 5.into(),
topics: vec![],
data: vec![],
}, LogEntry {
address: 15.into(),
topics: vec![],
data: vec![],
}];
let receipts = vec![Receipt {
outcome: TransactionOutcome::StateRoot(state_root),
gas_used: 5.into(),
log_bloom: Default::default(),
logs: vec![logs[0].clone()],
}, Receipt {
outcome: TransactionOutcome::StateRoot(state_root),
gas_used: gas_used,
log_bloom: Default::default(),
logs: logs.clone(),
}];
// when
let receipt = transaction_receipt(&machine, transaction, receipts);
// then
assert_eq!(receipt, LocalizedReceipt {
from: tx1.sender().into(),
to: match tx1.action {
Action::Create => None,
Action::Call(ref address) => Some(address.clone().into())
},
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
transaction_hash: tx1.hash(),
transaction_index: 1,
block_hash: block_hash,
block_number: block_number,
cumulative_gas_used: gas_used,
gas_used: gas_used - 5.into(),
contract_address: None,
logs: vec![LocalizedLogEntry {
entry: logs[0].clone(),
block_hash: block_hash,
block_number: block_number,
transaction_hash: tx1.hash(),
transaction_index: 1,
transaction_log_index: 0,
log_index: 1,
}, LocalizedLogEntry {
entry: logs[1].clone(),
block_hash: block_hash,
block_number: block_number,
transaction_hash: tx1.hash(),
transaction_index: 1,
transaction_log_index: 1,
log_index: 2,
}],
log_bloom: Default::default(),
outcome: TransactionOutcome::StateRoot(state_root),
#[derive(Debug)]
enum QueueError {
Channel(IoError),
Full(usize),
}
impl fmt::Display for QueueError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
QueueError::Channel(ref c) => fmt::Display::fmt(c, fmt),
QueueError::Full(limit) => write!(fmt, "The queue is full ({})", limit),
}
}
}
/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
limit: usize,
}
impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
IoChannelQueue {
currently_queued: Default::default(),
limit,
}
}
pub fn queue<F>(&self, channel: &mut IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
F: Fn(&Client) + Send + Sync + 'static,
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
ensure!(queue_size < self.limit, QueueError::Full(self.limit));
let currently_queued = self.currently_queued.clone();
let result = channel.send(ClientIoMessage::execute(move |client| {
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client);
}));
match result {
Ok(_) => {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
},
Err(e) => Err(QueueError::Channel(e)),
}
}
}