Newer
Older
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashSet, HashMap, BTreeMap, VecDeque};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use itertools::Itertools;
use util::{Bytes, PerfTimer, Mutex, RwLock, MutexGuard, Hashable};
use util::{journaldb, DBValue, TrieFactory, Trie};
use util::{U256, H256, Address, H2048};
use util::trie::TrieSpec;
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
use blockchain::extras::TransactionAddress;
use client::ancient_import::AncientVerifier;
use client::Error as ClientError;
BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient,
MiningBlockChainClient, EngineClient, TraceFilter, CallAnalytics, BlockImportError, Mode,
ChainNotify, PruningInfo, ProvingBlockChainClient,
use engines::{Engine, EpochTransition};
use error::{ImportError, ExecutionError, CallError, BlockError, ImportResult, Error as EthcoreError};
use evm::{Factory as EvmFactory, Schedule};
use executive::{Executive, Executed, TransactOptions, contract_address};
use factory::Factories;
use futures::{future, Future};
use io::*;
use log_entry::LocalizedLogEntry;
use miner::{Miner, MinerService, TransactionImportResult};
use native_contracts::Registry;
use rand::OsRng;
use receipt::{Receipt, LocalizedReceipt};
use service::ClientIoMessage;
use snapshot::{self, io as snapshot_io};
use spec::Spec;
use trace;
use trace::{TraceDB, ImportRequest as TraceImportRequest, LocalizedTrace, Database as TraceDatabase};
use trace::FlatTransactionTraces;
use transaction::{LocalizedTransaction, UnverifiedTransaction, SignedTransaction, Transaction, PendingTransaction, Action};
use types::filter::Filter;
use types::mode::Mode as IpcMode;
use verification;
use verification::{PreverifiedBlock, Verifier};
use verification::queue::BlockQueue;
use views::BlockView;
// re-export
pub use types::blockchain_info::BlockChainInfo;
pub use types::block_status::BlockStatus;
pub use blockchain::CacheSize as BlockChainCacheSize;
pub use verification::queue::QueueInfo as BlockQueueInfo;
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
const MIN_HISTORY_SIZE: u64 = 8;
#[derive(Default, Clone, Debug, Eq, PartialEq)]
/// Memory used by state DB
pub state_db_mem: usize,
/// Alter internal reporting to reflect the additional `block` has been processed.
pub fn accrue_block(&mut self, block: &PreverifiedBlock) {
self.blocks_imported += 1;
self.transactions_applied += block.transactions.len();
self.gas_processed = self.gas_processed + block.header.gas_used().clone();
asynchronous rob
committed
impl<'a> ::std::ops::Sub<&'a ClientReport> for ClientReport {
type Output = Self;
fn sub(mut self, other: &'a ClientReport) -> Self {
let higher_mem = ::std::cmp::max(self.state_db_mem, other.state_db_mem);
let lower_mem = ::std::cmp::min(self.state_db_mem, other.state_db_mem);
self.blocks_imported -= other.blocks_imported;
self.transactions_applied -= other.transactions_applied;
self.gas_processed = self.gas_processed - other.gas_processed;
self.state_db_mem = higher_mem - lower_mem;
self
}
}
struct SleepState {
last_activity: Option<Instant>,
last_autosleep: Option<Instant>,
}
impl SleepState {
fn new(awake: bool) -> Self {
SleepState {
last_activity: match awake { false => None, true => Some(Instant::now()) },
last_autosleep: match awake { false => Some(Instant::now()), true => None },
}
}
}
/// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue.
/// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue.
pub struct Client {
chain: RwLock<Arc<BlockChain>>,
tracedb: RwLock<TraceDB<BlockChain>>,
config: ClientConfig,
pruning: journaldb::Algorithm,
db: RwLock<Arc<KeyValueDB>>,
Tomusdrw
committed
import_lock: Mutex<()>,
verifier: Box<Verifier>,
sleep_state: Mutex<SleepState>,
liveness: AtomicBool,
io_channel: Mutex<IoChannel<ClientIoMessage>>,
notify: RwLock<Vec<Weak<ChainNotify>>>,
last_hashes: RwLock<VecDeque<H256>>,
factories: Factories,
ancient_verifier: Mutex<Option<AncientVerifier>>,
on_user_defaults_change: Mutex<Option<Box<FnMut(Option<Mode>) + 'static + Send>>>,
exit_handler: Mutex<Option<Box<Fn(bool, Option<String>) + 'static + Send>>>,
/// Create a new client with given parameters.
/// The database is assumed to have been initialized with the correct columns.
db: Arc<KeyValueDB>,
message_channel: IoChannel<ClientIoMessage>,
) -> Result<Arc<Client>, ::error::Error> {
let trie_spec = match config.fat_db {
true => TrieSpec::Fat,
false => TrieSpec::Secure,
};
let factories = Factories {
vm: EvmFactory::new(config.vm_type.clone(), config.jump_table_size),
trie: trie_factory,
accountdb: Default::default(),
};
let journal_db = journaldb::new(db.clone(), config.pruning, ::db::COL_STATE);
let mut state_db = StateDB::new(journal_db, config.state_cache_size);
if state_db.journal_db().is_empty() {
// Sets the correct state root.
state_db = spec.ensure_db_good(state_db, &factories)?;
let mut batch = DBTransaction::new();
state_db.journal_under(&mut batch, 0, &spec.genesis_header().hash())?;
db.write(batch).map_err(ClientError::Database)?;
Tomusdrw
committed
let gb = spec.genesis_block();
let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone()));
let tracedb = RwLock::new(TraceDB::new(config.tracing.clone(), db.clone(), chain.clone()));
trace!("Cleanup journal: DB Earliest = {:?}, Latest = {:?}", state_db.journal_db().earliest_era(), state_db.journal_db().latest_era());
let history = if config.history < MIN_HISTORY_SIZE {
info!(target: "client", "Ignoring pruning history parameter of {}\
, falling back to minimum of {}",
config.history, MIN_HISTORY_SIZE);
MIN_HISTORY_SIZE
} else {
config.history
};
if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(h.state_root())) {
warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex());
}
let engine = spec.engine.clone();
let block_queue = BlockQueue::new(config.queue.clone(), engine.clone(), message_channel.clone(), config.verifier_type.verifying_seal());
Tomusdrw
committed
let awake = match config.mode { Mode::Dark(..) | Mode::Off => false, _ => true };
enabled: AtomicBool::new(true),
sleep_state: Mutex::new(SleepState::new(awake)),
liveness: AtomicBool::new(awake),
Tomusdrw
committed
engine: engine,
pruning: config.pruning.clone(),
verifier: verification::new(config.verifier_type.clone()),
config: config,
db: RwLock::new(db),
state_db: Mutex::new(state_db),
report: RwLock::new(Default::default()),
import_lock: Mutex::new(()),
notify: RwLock::new(Vec::new()),
last_hashes: RwLock::new(VecDeque::new()),
factories: factories,
rng: Mutex::new(OsRng::new().map_err(::util::UtilError::StdIo)?),
{
let state_db = client.state_db.lock().boxed_clone();
let chain = client.chain.read();
client.prune_ancient(state_db, &chain)?;
}
// ensure genesis epoch proof in the DB.
{
let chain = client.chain.read();
if chain.epoch_transition(0, gh.hash()).is_none() {
trace!(target: "client", "No genesis transition found.");
let proof = client.with_proving_caller(
BlockId::Number(0),
|call| client.engine.genesis_epoch_data(&gh, call)
);
let proof = match proof {
Ok(proof) => proof,
Err(e) => {
warn!(target: "client", "Error generating genesis epoch data: {}. Snapshots generated may not be complete.", e);
Vec::new()
}
};
debug!(target: "client", "Obtained genesis transition proof: {:?}", proof);
let mut batch = DBTransaction::new();
chain.insert_epoch_transition(&mut batch, 0, EpochTransition {
block_hash: gh.hash(),
block_number: 0,
proof: proof,
});
client.db.read().write_buffered(batch);
if let Some(reg_addr) = client.additional_params().get("registrar").and_then(|s| Address::from_str(s).ok()) {
let registrar = Registry::new(reg_addr);
*client.registrar.lock() = Some(registrar);
}
// ensure buffered changes are flushed.
client.db.read().flush().map_err(ClientError::Database)?;
/// Wakes up client if it's a sleep.
pub fn keep_alive(&self) {
let should_wake = match *self.mode.lock() {
Mode::Dark(..) | Mode::Passive(..) => true,
_ => false,
};
if should_wake {
self.wake_up();
(*self.sleep_state.lock()).last_activity = Some(Instant::now());
}
}
/// Adds an actor to be notified on certain events
pub fn add_notify(&self, target: Arc<ChainNotify>) {
self.notify.write().push(Arc::downgrade(&target));
/// Set a closure to call when we want to restart the client
pub fn set_exit_handler<F>(&self, f: F) where F: Fn(bool, Option<String>) + 'static + Send {
*self.exit_handler.lock() = Some(Box::new(f));
}
/// Returns engine reference.
pub fn engine(&self) -> &Engine {
&*self.engine
}
fn notify<F>(&self, f: F) where F: Fn(&ChainNotify) {
for np in self.notify.read().iter() {
if let Some(n) = np.upgrade() {
f(&*n);
}
}
/// Get the Registry object - useful for looking up names.
pub fn registrar(&self) -> MutexGuard<Option<Registry>> {
self.registrar.lock()
}
/// Register an action to be done if a mode/spec_name change happens.
pub fn on_user_defaults_change<F>(&self, f: F) where F: 'static + FnMut(Option<Mode>) + Send {
*self.on_user_defaults_change.lock() = Some(Box::new(f));
/// Flush the block import queue.
pub fn flush_queue(&self) {
while !self.block_queue.queue_info().is_empty() {
pub fn latest_env_info(&self) -> EnvInfo {
self.env_info(BlockId::Latest).expect("Best block header always stored; qed")
}
/// The env info as of a given block.
/// returns `None` if the block unknown.
pub fn env_info(&self, id: BlockId) -> Option<EnvInfo> {
self.block_header(id).map(|header| {
EnvInfo {
number: header.number(),
author: header.author(),
timestamp: header.timestamp(),
difficulty: header.difficulty(),
last_hashes: self.build_last_hashes(header.parent_hash()),
gas_used: U256::default(),
gas_limit: header.gas_limit(),
}
})
fn build_last_hashes(&self, parent_hash: H256) -> Arc<LastHashes> {
{
let hashes = self.last_hashes.read();
if hashes.front().map_or(false, |h| h == &parent_hash) {
let mut res = Vec::from(hashes.clone());
res.resize(256, H256::default());
let mut last_hashes = LastHashes::new();
for i in 0..255 {
Some(details) => {
last_hashes[i + 1] = details.parent.clone();
},
None => break,
}
}
let mut cached_hashes = self.last_hashes.write();
*cached_hashes = VecDeque::from(last_hashes.clone());
fn check_and_close_block(&self, block: &PreverifiedBlock) -> Result<LockedBlock, ()> {
let engine = &*self.engine;
// Check the block isn't so old we won't be able to enact it.
let best_block_number = chain.best_block_number();
if self.pruning_info().earliest_state > header.number() {
warn!(target: "client", "Block import failed for #{} ({})\nBlock is ancient (current best block: #{}).", header.number(), header.hash(), best_block_number);
return Err(());
}
let verify_family_result = self.verifier.verify_block_family(header, &block.bytes, engine, &**chain);
if let Err(e) = verify_family_result {
warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
};
let verify_external_result = self.verifier.verify_block_external(header, &block.bytes, engine);
if let Err(e) = verify_external_result {
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
};
let chain_has_parent = chain.block_header(header.parent_hash());
if let Some(parent) = chain_has_parent {
// Enact Verified Block
let last_hashes = self.build_last_hashes(header.parent_hash().clone());
let db = self.state_db.lock().boxed_clone_canon(header.parent_hash());
let is_epoch_begin = chain.epoch_transition(parent.number(), *header.parent_hash()).is_some();
let enact_result = enact_verified(block,
engine,
self.tracedb.read().tracing_enabled(),
db,
&parent,
last_hashes,
self.factories.clone(),
is_epoch_begin,
);
let mut locked_block = enact_result.map_err(|e| {
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
if header.number() < self.engine().params().validate_receipts_transition && header.receipts_root() != locked_block.block().header().receipts_root() {
locked_block = locked_block.strip_receipts();
}
if let Err(e) = self.verifier.verify_block_final(header, locked_block.block().header()) {
warn!(target: "client", "Stage 5 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
Ok(locked_block)
} else {
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash());
Err(())
fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}
// In ImportRoute we get all the blocks that have been enacted and retracted by single insert.
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = import_results.iter().fold(HashMap::new(), |mut map, route| {
for hash in &route.enacted {
map.insert(hash.clone(), true);
for hash in &route.retracted {
map.insert(hash.clone(), false);
}
map
});
// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}
/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self) -> usize {
// Shortcut out if we know we're incapable of syncing the chain.
if !self.enabled.load(AtomicOrdering::Relaxed) {
return 0;
}
let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, is_empty) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut proposed_blocks = Vec::with_capacity(max_blocks_to_import);
let mut import_results = Vec::with_capacity(max_blocks_to_import);
let _import_lock = self.import_lock.lock();
let blocks = self.block_queue.drain(max_blocks_to_import);
if blocks.is_empty() {
return 0;
}
let _timer = PerfTimer::new("import_verified_blocks");
let start = precise_time_ns();
for block in blocks {
let header = &block.header;
let is_invalid = invalid_blocks.contains(header.parent_hash());
invalid_blocks.insert(header.hash());
continue;
}
if let Ok(closed_block) = self.check_and_close_block(&block) {
proposed_blocks.push(block.bytes);
} else {
imported_blocks.push(header.hash());
let route = self.commit_block(closed_block, &header, &block.bytes);
} else {
invalid_blocks.insert(header.hash());
}
let imported = imported_blocks.len();
let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
let is_empty = self.block_queue.mark_as_good(&imported_blocks);
let duration_ns = precise_time_ns() - start;
(imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration_ns, is_empty)
if !imported_blocks.is_empty() && is_empty {
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
}
notify.new_blocks(
imported_blocks.clone(),
invalid_blocks.clone(),
enacted.clone(),
retracted.clone(),
self.db.read().flush().expect("DB flush failed.");
/// Import a block with transaction receipts.
/// The block is guaranteed to be the next best blocks in the first block sequence.
/// Does no sealing or transaction validation.
fn import_old_block(&self, block_bytes: Bytes, receipts_bytes: Bytes) -> Result<H256, ::error::Error> {
let block = BlockView::new(&block_bytes);
let header = block.header();
let receipts = ::rlp::decode_list(&receipts_bytes);
let _import_lock = self.import_lock.lock();
{
let _timer = PerfTimer::new("import_old_block");
let chain = self.chain.read();
let mut ancient_verifier = self.ancient_verifier.lock();
{
// closure for verifying a block.
let verify_with = |verifier: &AncientVerifier| -> Result<(), ::error::Error> {
// verify the block, passing the chain for updating the epoch
// verifier.
verifier.verify(&mut *self.rng.lock(), &header, &chain)
// initialize the ancient block verifier if we don't have one already.
match &mut *ancient_verifier {
&mut Some(ref verifier) => {
verify_with(verifier)?
}
x @ &mut None => {
// load most recent epoch.
trace!(target: "client", "Initializing ancient block restoration.");
let current_epoch_data = chain.epoch_transitions()
.take_while(|&(_, ref t)| t.block_number < header.number())
.last()
.map(|(_, t)| t.proof)
.expect("At least one epoch entry (genesis) always stored; qed");
let current_verifier = self.engine.epoch_verifier(&header, ¤t_epoch_data)
.known_confirmed()?;
let current_verifier = AncientVerifier::new(self.engine.clone(), current_verifier);
verify_with(¤t_verifier)?;
*x = Some(current_verifier);
}
}
}
let mut batch = DBTransaction::new();
chain.insert_unordered_block(&mut batch, &block_bytes, receipts, None, false, true);
// Final commit to the DB
self.db.read().write_buffered(batch);
chain.commit();
}
self.db.read().flush().expect("DB flush failed.");
// NOTE: the header of the block passed here is not necessarily sealed, as
// it is for reconstructing the state transition.
//
// The header passed is from the original block data and is sealed.
fn commit_block<B>(&self, block: B, header: &Header, block_data: &[u8]) -> ImportRoute where B: IsBlock + Drain {
let hash = &header.hash();
let number = header.number();
let parent = header.parent_hash();
let receipts = block.receipts().to_owned();
let traces = block.traces().clone().unwrap_or_else(Vec::new);
let traces: Vec<FlatTransactionTraces> = traces.into_iter()
.map(Into::into)
.collect();
assert_eq!(header.hash(), BlockView::new(block_data).header_view().sha3());
//let traces = From::from(block.traces().clone().unwrap_or_else(Vec::new));
let mut batch = DBTransaction::new();
// CHECK! I *think* this is fine, even if the state_root is equal to another
// already-imported block of the same number.
// TODO: Prove it with a test.
let mut state = block.drain();
// check epoch end signal, potentially generating a proof on the current
// state.
self.check_epoch_end_signal(
&header,
block_data,
&receipts,
&state,
&chain,
&mut batch,
);
state.journal_under(&mut batch, number, hash).expect("DB commit failed");
let route = chain.insert_block(&mut batch, block_data, receipts.clone());
self.tracedb.read().import(&mut batch, TraceImportRequest {
block_hash: hash.clone(),
block_number: number,
enacted: route.enacted.clone(),
retracted: route.retracted.len()
});
let is_canon = route.enacted.last().map_or(false, |h| h == hash);
state.sync_cache(&route.enacted, &route.retracted, is_canon);
self.db.read().write_buffered(batch);
chain.commit();
self.check_epoch_end(&header, &chain);
self.update_last_hashes(&parent, hash);
if let Err(e) = self.prune_ancient(state, &chain) {
warn!("Failed to prune ancient state data: {}", e);
}
// check for epoch end signal and write pending transition if it occurs.
// state for the given block must be available.
fn check_epoch_end_signal(
&self,
header: &Header,
block_bytes: &[u8],
receipts: &[Receipt],
state_db: &StateDB,
chain: &BlockChain,
batch: &mut DBTransaction,
) {
use engines::EpochChange;
match self.engine.signals_epoch_end(header, Some(block_bytes), Some(&receipts)) {
EpochChange::Yes(proof) => {
use engines::epoch::PendingTransition;
use engines::Proof;
let proof = match proof {
Proof::Known(proof) => proof,
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
Proof::WithState(with_state) => {
let env_info = EnvInfo {
number: header.number(),
author: header.author().clone(),
timestamp: header.timestamp(),
difficulty: header.difficulty().clone(),
last_hashes: self.build_last_hashes(header.parent_hash().clone()),
gas_used: U256::default(),
gas_limit: u64::max_value().into(),
};
let call = move |addr, data| {
let mut state_db = state_db.boxed_clone();
let backend = ::state::backend::Proving::new(state_db.as_hashdb_mut());
let transaction =
self.contract_call_tx(BlockId::Hash(*header.parent_hash()), addr, data);
let mut state = State::from_existing(
backend,
header.state_root().clone(),
self.engine.account_start_nonce(header.number()),
self.factories.clone(),
).expect("state known to be available for just-imported block; qed");
let options = TransactOptions { tracing: false, vm_tracing: false, check_nonce: false };
let res = Executive::new(&mut state, &env_info, &*self.engine)
.transact(&transaction, options);
let res = match res {
Err(ExecutionError::Internal(e)) =>
Err(format!("Internal error: {}", e)),
Err(e) => {
trace!(target: "client", "Proved call failed: {}", e);
Ok((Vec::new(), state.drop().1.extract_proof()))
}
Ok(res) => Ok((res.output, state.drop().1.extract_proof())),
};
res.map(|(output, proof)| (output, proof.into_iter().map(|x| x.into_vec()).collect()))
};
match (with_state)(&call) {
Ok(proof) => proof,
Err(e) => {
warn!(target: "client", "Failed to generate transition proof for block {}: {}", hash, e);
warn!(target: "client", "Snapshots produced by this client may be incomplete");
Vec::new()
}
debug!(target: "client", "Block {} signals epoch end.", hash);
let pending = PendingTransition { proof: proof };
chain.insert_pending_transition(batch, hash, pending);
},
EpochChange::No => {},
EpochChange::Unsure(_) => {
warn!(target: "client", "Detected invalid engine implementation.");
warn!(target: "client", "Engine claims to require more block data, but everything provided.");
// check for ending of epoch and write transition if it occurs.
fn check_epoch_end<'a>(&self, header: &'a Header, chain: &BlockChain) {
let is_epoch_end = self.engine.is_epoch_end(
header,
&(|hash| chain.block_header(&hash)),
&(|hash| chain.get_pending_transition(hash)), // TODO: limit to current epoch.
);
if let Some(proof) = is_epoch_end {
debug!(target: "client", "Epoch transition at block {}", header.hash());
let mut batch = DBTransaction::new();
chain.insert_epoch_transition(&mut batch, header.number(), EpochTransition {
block_hash: header.hash(),
block_number: header.number(),
proof: proof,
});
// always write the batch directly since epoch transition proofs are
// fetched from a DB iterator and DB iterators are only available on
// flushed data.
self.db.read().write(batch).expect("DB flush failed");
}
}
// use a state-proving closure for the given block.
fn with_proving_caller<F, T>(&self, id: BlockId, with_call: F) -> T
where F: FnOnce(&::engines::Call) -> T
{
let call = |a, d| {
let tx = self.contract_call_tx(id, a, d);
let (result, items) = self.prove_transaction(tx, id)
.ok_or_else(|| format!("Unable to make call. State unavailable?"))?;
let items = items.into_iter().map(|x| x.to_vec()).collect();
Ok((result, items))
};
with_call(&call)
}
// prune ancient states until below the memory limit or only the minimum amount remain.
fn prune_ancient(&self, mut state_db: StateDB, chain: &BlockChain) -> Result<(), ClientError> {
let number = match state_db.journal_db().latest_era() {
Some(n) => n,
None => return Ok(()),
};
// prune all ancient eras until we're below the memory target,
// but have at least the minimum number of states.
loop {
let needs_pruning = state_db.journal_db().is_pruned() &&
state_db.journal_db().journal_size() >= self.config.history_mem;
if !needs_pruning { break }
match state_db.journal_db().earliest_era() {
Some(era) if era + self.history <= number => {
trace!(target: "client", "Pruning state for ancient era {}", era);
match chain.block_hash(era) {
Some(ancient_hash) => {
let mut batch = DBTransaction::new();
state_db.mark_canonical(&mut batch, era, &ancient_hash)?;
self.db.read().write_buffered(batch);
state_db.journal_db().flush();
}
None =>
debug!(target: "client", "Missing expected hash for block {}", era),
}
}
_ => break, // means that every era is kept, no pruning necessary.
}
}
Ok(())
}
fn update_last_hashes(&self, parent: &H256, hash: &H256) {
let mut hashes = self.last_hashes.write();
if hashes.front().map_or(false, |h| h == parent) {
if hashes.len() > 255 {
hashes.pop_back();
}
hashes.push_front(hash.clone());
}
}
pub fn import_queued_transactions(&self, transactions: &[Bytes], peer_id: usize) -> usize {
let _timer = PerfTimer::new("import_queued_transactions");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let txs: Vec<UnverifiedTransaction> = transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect();
let hashes: Vec<_> = txs.iter().map(|tx| tx.hash()).collect();
self.notify(|notify| {
notify.transactions_received(hashes.clone(), peer_id);
let results = self.miner.import_external_transactions(self, txs);
/// Get shared miner reference.
pub fn miner(&self) -> Arc<Miner> {
self.miner.clone()
}
/// Replace io channel. Useful for testing.
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
*self.io_channel.lock() = io_channel;
/// Attempt to get a copy of a specific block's final state.
/// This will not fail if given BlockId::Latest.
/// Otherwise, this can fail (but may not) if the DB prunes state or the block
/// is unknown.
pub fn state_at(&self, id: BlockId) -> Option<State<StateDB>> {
// fast path for latest state.
BlockId::Pending => return self.miner.pending_state(self.chain.read().best_block_number()).or_else(|| Some(self.state())),
BlockId::Latest => return Some(self.state()),
let block_number = match self.block_number(id.clone()) {
Some(num) => num,
None => return None,
};
asynchronous rob
committed
self.block_header(id).and_then(|header| {
let db = self.state_db.lock().boxed_clone();
if db.is_pruned() && self.pruning_info().earliest_state > block_number {
let root = header.state_root();
State::from_existing(db, root, self.engine.account_start_nonce(block_number), self.factories.clone()).ok()
/// Attempt to get a copy of a specific block's beginning state.
///
/// This will not fail if given BlockId::Latest.
/// Otherwise, this can fail (but may not) if the DB prunes state.
pub fn state_at_beginning(&self, id: BlockId) -> Option<State<StateDB>> {
// fast path for latest state.
match id {
BlockId::Pending => self.state_at(BlockId::Latest),
id => match self.block_number(id) {
None | Some(0) => None,
Some(n) => self.state_at(BlockId::Number(n - 1)),
/// Get a copy of the best block's state.
let header = self.best_block_header();
self.state_db.lock().boxed_clone_canon(&header.hash()),
header.state_root(),
self.engine.account_start_nonce(header.number()),
self.factories.clone())
.expect("State root of best block header always valid.")
pub fn blockchain_cache_info(&self) -> BlockChainCacheSize {
/// Get the report.
pub fn report(&self) -> ClientReport {
let mut report = self.report.read().clone();
report.state_db_mem = self.state_db.lock().mem_used();
self.check_garbage();
self.check_snooze();
}
fn check_garbage(&self) {
self.block_queue.collect_garbage();
let mode = self.mode.lock().clone();
match mode {
let mut ss = self.sleep_state.lock();
self.sleep();
ss.last_activity = None;
}
}
}
Mode::Passive(timeout, wakeup_after) => {
let mut ss = self.sleep_state.lock();
let now = Instant::now();
if let Some(t) = ss.last_activity {