// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
//! Substrate Client
use std::{
marker::PhantomData, collections::{HashSet, BTreeMap, HashMap}, sync::Arc,
panic::UnwindSafe, result, cell::RefCell, rc::Rc,
};
use crate::error::Error;
use futures::sync::mpsc;
use parking_lot::{Mutex, RwLock};
use primitives::NativeOrEncoded;
use runtime_primitives::{
Justification,
generic::{BlockId, SignedBlock},
};
use consensus::{
Error as ConsensusError, ImportBlock,
ImportResult, BlockOrigin, ForkChoiceStrategy,
well_known_cache_keys::Id as CacheKeyId,
SelectChain, self,
};
use runtime_primitives::traits::{
Block as BlockT, Header as HeaderT, Zero, NumberFor, CurrentHeight,
BlockNumberToHash, ApiRef, ProvideRuntimeApi, Digest, DigestItem,
SaturatedConversion, One
};
use runtime_primitives::BuildStorage;
use crate::runtime_api::{
CallRuntimeAt, ConstructRuntimeApi, Core as CoreApi, ProofRecorder,
InitializeBlock,
};
use primitives::{
Blake2Hasher, H256, ChangesTrieConfiguration, convert_hash,
NeverNativeValue, ExecutionContext
};
use primitives::storage::{StorageKey, StorageData};
use primitives::storage::well_known_keys;
use parity_codec::{Encode, Decode};
use state_machine::{
DBValue, Backend as StateBackend, CodeExecutor, ChangesTrieAnchorBlockId,
ExecutionStrategy, ExecutionManager, prove_read, prove_child_read,
ChangesTrieRootsStorage, ChangesTrieStorage,
key_changes, key_changes_proof, OverlayedChanges, NeverOffchainExt,
};
use hash_db::Hasher;
use crate::backend::{self, BlockImportOperation, PrunableStateChangesTrieStorage};
use crate::blockchain::{
self, Info as ChainInfo, Backend as ChainBackend,
HeaderBackend as ChainHeaderBackend, ProvideCache, Cache,
};
use crate::call_executor::{CallExecutor, LocalCallExecutor};
use executor::{RuntimeVersion, RuntimeInfo};
use crate::notifications::{StorageNotifications, StorageEventStream};
use crate::light::{call_executor::prove_execution, fetcher::ChangesProof};
use crate::cht;
use crate::error;
use crate::in_mem;
use crate::block_builder::{self, api::BlockBuilder as BlockBuilderAPI};
use crate::genesis;
use substrate_telemetry::{telemetry, SUBSTRATE_INFO};
use log::{info, trace, warn};
/// Type that implements `futures::Stream` of block import events.
pub type ImportNotifications = mpsc::UnboundedReceiver>;
/// A stream of block finality notifications.
pub type FinalityNotifications = mpsc::UnboundedReceiver>;
type StorageUpdate = <
<
>::BlockImportOperation
as BlockImportOperation
>::State as state_machine::Backend>::Transaction;
type ChangesUpdate = trie::MemoryDB;
/// Execution strategies settings.
#[derive(Debug, Clone)]
pub struct ExecutionStrategies {
/// Execution strategy used when syncing.
pub syncing: ExecutionStrategy,
/// Execution strategy used when importing blocks.
pub importing: ExecutionStrategy,
/// Execution strategy used when constructing blocks.
pub block_construction: ExecutionStrategy,
/// Execution strategy used for offchain workers.
pub offchain_worker: ExecutionStrategy,
/// Execution strategy used in other cases.
pub other: ExecutionStrategy,
}
impl Default for ExecutionStrategies {
fn default() -> ExecutionStrategies {
ExecutionStrategies {
syncing: ExecutionStrategy::NativeElseWasm,
importing: ExecutionStrategy::NativeElseWasm,
block_construction: ExecutionStrategy::AlwaysWasm,
offchain_worker: ExecutionStrategy::NativeWhenPossible,
other: ExecutionStrategy::NativeElseWasm,
}
}
}
/// Substrate Client
pub struct Client where Block: BlockT {
backend: Arc,
executor: E,
storage_notifications: Mutex>,
import_notification_sinks: Mutex>>>,
finality_notification_sinks: Mutex>>>,
import_lock: Arc>,
// holds the block hash currently being imported. TODO: replace this with block queue
importing_block: RwLock>,
execution_strategies: ExecutionStrategies,
_phantom: PhantomData,
}
/// Client import operation, a wrapper for the backend.
pub struct ClientImportOperation, B: backend::Backend> {
op: B::BlockImportOperation,
notify_imported: Option<(Block::Hash, BlockOrigin, Block::Header, bool, Option, Option>)>>)>,
notify_finalized: Vec,
}
/// A source of blockchain events.
pub trait BlockchainEvents {
/// Get block import event stream. Not guaranteed to be fired for every
/// imported block.
fn import_notification_stream(&self) -> ImportNotifications;
/// Get a stream of finality notifications. Not guaranteed to be fired for every
/// finalized block.
fn finality_notification_stream(&self) -> FinalityNotifications;
/// Get storage changes event stream.
///
/// Passing `None` as `filter_keys` subscribes to all storage changes.
fn storage_changes_notification_stream(&self,
filter_keys: Option<&[StorageKey]>
) -> error::Result>;
}
/// Fetch block body by ID.
pub trait BlockBody {
/// Get block body by ID. Returns `None` if the body is not stored.
fn block_body(&self,
id: &BlockId
) -> error::Result::Extrinsic>>>;
}
/// Client info
#[derive(Debug)]
pub struct ClientInfo {
/// Best block hash.
pub chain: ChainInfo,
/// Best block number in the queue.
pub best_queued_number: Option<<::Header as HeaderT>::Number>,
/// Best queued block hash.
pub best_queued_hash: Option,
}
/// Block status.
#[derive(Debug, PartialEq, Eq)]
pub enum BlockStatus {
/// Added to the import queue.
Queued,
/// Already in the blockchain and the state is available.
InChainWithState,
/// In the blockchain, but the state is not available.
InChainPruned,
/// Block or parent is known to be bad.
KnownBad,
/// Not in the queue or the blockchain.
Unknown,
}
/// Summary of an imported block
#[derive(Clone, Debug)]
pub struct BlockImportNotification {
/// Imported block header hash.
pub hash: Block::Hash,
/// Imported block origin.
pub origin: BlockOrigin,
/// Imported block header.
pub header: Block::Header,
/// Is this the new best block.
pub is_new_best: bool,
}
/// Summary of a finalized block.
#[derive(Clone, Debug)]
pub struct FinalityNotification {
/// Imported block header hash.
pub hash: Block::Hash,
/// Imported block header.
pub header: Block::Header,
}
// used in importing a block, where additional changes are made after the runtime
// executed.
enum PrePostHeader {
// they are the same: no post-runtime digest items.
Same(H),
// different headers (pre, post).
Different(H, H),
}
impl PrePostHeader {
// get a reference to the "pre-header" -- the header as it should be just after the runtime.
fn pre(&self) -> &H {
match *self {
PrePostHeader::Same(ref h) => h,
PrePostHeader::Different(ref h, _) => h,
}
}
// get a reference to the "post-header" -- the header as it should be after all changes are applied.
fn post(&self) -> &H {
match *self {
PrePostHeader::Same(ref h) => h,
PrePostHeader::Different(_, ref h) => h,
}
}
// convert to the "post-header" -- the header as it should be after all changes are applied.
fn into_post(self) -> H {
match self {
PrePostHeader::Same(h) => h,
PrePostHeader::Different(_, h) => h,
}
}
}
/// Create an instance of in-memory client.
pub fn new_in_mem(
executor: E,
genesis_storage: S,
) -> error::Result,
LocalCallExecutor, E>,
Block,
RA
>> where
E: CodeExecutor + RuntimeInfo,
S: BuildStorage,
Block: BlockT,
{
new_with_backend(Arc::new(in_mem::Backend::new()), executor, genesis_storage)
}
/// Create a client with the explicitly provided backend.
/// This is useful for testing backend implementations.
pub fn new_with_backend(
backend: Arc,
executor: E,
build_genesis_storage: S,
) -> error::Result, Block, RA>>
where
E: CodeExecutor + RuntimeInfo,
S: BuildStorage,
Block: BlockT,
B: backend::LocalBackend
{
let call_executor = LocalCallExecutor::new(backend.clone(), executor);
Client::new(backend, call_executor, build_genesis_storage, Default::default())
}
impl Client where
B: backend::Backend,
E: CallExecutor,
Block: BlockT,
{
/// Creates new Substrate Client with given blockchain and code executor.
pub fn new(
backend: Arc,
executor: E,
build_genesis_storage: S,
execution_strategies: ExecutionStrategies
) -> error::Result {
if backend.blockchain().header(BlockId::Number(Zero::zero()))?.is_none() {
let (genesis_storage, children_genesis_storage) = build_genesis_storage.build_storage()?;
let mut op = backend.begin_operation()?;
backend.begin_state_operation(&mut op, BlockId::Hash(Default::default()))?;
let state_root = op.reset_storage(genesis_storage, children_genesis_storage)?;
let genesis_block = genesis::construct_genesis_block::(state_root.into());
info!("Initializing Genesis block/state (state: {}, header-hash: {})",
genesis_block.header().state_root(),
genesis_block.header().hash()
);
op.set_block_data(
genesis_block.deconstruct().0,
Some(vec![]),
None,
crate::backend::NewBlockState::Final
)?;
backend.commit_operation(op)?;
}
Ok(Client {
backend,
executor,
storage_notifications: Default::default(),
import_notification_sinks: Default::default(),
finality_notification_sinks: Default::default(),
import_lock: Default::default(),
importing_block: Default::default(),
execution_strategies,
_phantom: Default::default(),
})
}
/// Get a reference to the execution strategies.
pub fn execution_strategies(&self) -> &ExecutionStrategies {
&self.execution_strategies
}
/// Get a reference to the state at a given block.
pub fn state_at(&self, block: &BlockId) -> error::Result {
self.backend.state_at(*block)
}
/// Expose backend reference. To be used in tests only
#[doc(hidden)]
#[deprecated(note="Rather than relying on `client` to provide this, access \
to the backend should be handled at setup only - see #1134. This function \
will be removed once that is in place.")]
pub fn backend(&self) -> &Arc {
&self.backend
}
/// Expose reference to import lock
#[doc(hidden)]
#[deprecated(note="Rather than relying on `client` to provide this, access \
to the backend should be handled at setup only - see #1134. This function \
will be removed once that is in place.")]
pub fn import_lock(&self) -> Arc> {
self.import_lock.clone()
}
/// Given a `BlockId` and a key prefix, return the matching child storage keys in that block.
pub fn storage_keys(&self, id: &BlockId, key_prefix: &StorageKey) -> error::Result> {
let keys = self.state_at(id)?.keys(&key_prefix.0).into_iter().map(StorageKey).collect();
Ok(keys)
}
/// Given a `BlockId` and a key, return the value under the key in that block.
pub fn storage(&self, id: &BlockId, key: &StorageKey) -> error::Result> {
Ok(self.state_at(id)?
.storage(&key.0).map_err(|e| error::Error::from_state(Box::new(e)))?
.map(StorageData))
}
/// Given a `BlockId`, a key prefix, and a child storage key, return the matching child storage keys.
pub fn child_storage_keys(
&self,
id: &BlockId,
child_storage_key: &StorageKey,
key_prefix: &StorageKey
) -> error::Result> {
let keys = self.state_at(id)?
.child_keys(&child_storage_key.0, &key_prefix.0)
.into_iter()
.map(StorageKey)
.collect();
Ok(keys)
}
/// Given a `BlockId`, a key and a child storage key, return the value under the key in that block.
pub fn child_storage(
&self,
id: &BlockId,
child_storage_key: &StorageKey,
key: &StorageKey
) -> error::Result> {
Ok(self.state_at(id)?
.child_storage(&child_storage_key.0, &key.0).map_err(|e| error::Error::from_state(Box::new(e)))?
.map(StorageData))
}
/// Get the code at a given block.
pub fn code_at(&self, id: &BlockId) -> error::Result> {
Ok(self.storage(id, &StorageKey(well_known_keys::CODE.to_vec()))?
.expect("None is returned if there's no value stored for the given key;\
':code' key is always defined; qed").0)
}
/// Get the RuntimeVersion at a given block.
pub fn runtime_version_at(&self, id: &BlockId) -> error::Result {
self.executor.runtime_version(id)
}
/// Get call executor reference.
pub fn executor(&self) -> &E {
&self.executor
}
/// Reads storage value at a given block + key, returning read proof.
pub fn read_proof(&self, id: &BlockId, key: &[u8]) -> error::Result>> {
self.state_at(id)
.and_then(|state| prove_read(state, key)
.map(|(_, proof)| proof)
.map_err(Into::into))
}
/// Reads child storage value at a given block + storage_key + key, returning
/// read proof.
pub fn read_child_proof(
&self,
id: &BlockId,
storage_key: &[u8],
key: &[u8]
) -> error::Result>> {
self.state_at(id)
.and_then(|state| prove_child_read(state, storage_key, key)
.map(|(_, proof)| proof)
.map_err(Into::into))
}
/// Execute a call to a contract on top of state in a block of given hash
/// AND returning execution proof.
///
/// No changes are made.
pub fn execution_proof(&self,
id: &BlockId,
method: &str,
call_data: &[u8]
) -> error::Result<(Vec, Vec>)> {
let state = self.state_at(id)?;
let header = self.prepare_environment_block(id)?;
prove_execution(state, header, &self.executor, method, call_data)
}
/// Reads given header and generates CHT-based header proof.
pub fn header_proof(&self, id: &BlockId) -> error::Result<(Block::Header, Vec>)> {
self.header_proof_with_cht_size(id, cht::size())
}
/// Get block hash by number.
pub fn block_hash(&self,
block_number: <::Header as HeaderT>::Number
) -> error::Result> {
self.backend.blockchain().hash(block_number)
}
/// Reads given header and generates CHT-based header proof for CHT of given size.
pub fn header_proof_with_cht_size(
&self,
id: &BlockId,
cht_size: NumberFor,
) -> error::Result<(Block::Header, Vec>)> {
let proof_error = || error::Error::Backend(format!("Failed to generate header proof for {:?}", id));
let header = self.backend.blockchain().expect_header(*id)?;
let block_num = *header.number();
let cht_num = cht::block_to_cht_number(cht_size, block_num).ok_or_else(proof_error)?;
let cht_start = cht::start_number(cht_size, cht_num);
let mut current_num = cht_start;
let cht_range = ::std::iter::from_fn(|| {
let old_current_num = current_num;
current_num = current_num + One::one();
Some(old_current_num)
});
let headers = cht_range.map(|num| self.block_hash(num));
let proof = cht::build_proof::(cht_size, cht_num, ::std::iter::once(block_num), headers)?;
Ok((header, proof))
}
/// Get longest range within [first; last] that is possible to use in `key_changes`
/// and `key_changes_proof` calls.
/// Range could be shortened from the beginning if some changes tries have been pruned.
/// Returns Ok(None) if changes trues are not supported.
pub fn max_key_changes_range(
&self,
first: NumberFor,
last: BlockId,
) -> error::Result, BlockId)>> {
let (config, storage) = match self.require_changes_trie().ok() {
Some((config, storage)) => (config, storage),
None => return Ok(None),
};
let last_num = self.backend.blockchain().expect_block_number_from_id(&last)?;
if first > last_num {
return Err(error::Error::ChangesTrieAccessFailed("Invalid changes trie range".into()));
}
let finalized_number = self.backend.blockchain().info()?.finalized_number;
let oldest = storage.oldest_changes_trie_block(&config, finalized_number);
let first = ::std::cmp::max(first, oldest);
Ok(Some((first, last)))
}
/// Get pairs of (block, extrinsic) where key has been changed at given blocks range.
/// Works only for runtimes that are supporting changes tries.
pub fn key_changes(
&self,
first: NumberFor,
last: BlockId,
key: &StorageKey
) -> error::Result, u32)>> {
let (config, storage) = self.require_changes_trie()?;
let last_number = self.backend.blockchain().expect_block_number_from_id(&last)?;
let last_hash = self.backend.blockchain().expect_block_hash_from_id(&last)?;
key_changes::<_, Blake2Hasher, _>(
&config,
&*storage,
first,
&ChangesTrieAnchorBlockId {
hash: convert_hash(&last_hash),
number: last_number,
},
self.backend.blockchain().info()?.best_number,
&key.0)
.and_then(|r| r.map(|r| r.map(|(block, tx)| (block, tx))).collect::>())
.map_err(|err| error::Error::ChangesTrieAccessFailed(err))
}
/// Get proof for computation of (block, extrinsic) pairs where key has been changed at given blocks range.
/// `min` is the hash of the first block, which changes trie root is known to the requester - when we're using
/// changes tries from ascendants of this block, we should provide proofs for changes tries roots
/// `max` is the hash of the last block known to the requester - we can't use changes tries from descendants
/// of this block.
/// Works only for runtimes that are supporting changes tries.
pub fn key_changes_proof(
&self,
first: Block::Hash,
last: Block::Hash,
min: Block::Hash,
max: Block::Hash,
key: &StorageKey
) -> error::Result> {
self.key_changes_proof_with_cht_size(
first,
last,
min,
max,
key,
cht::size(),
)
}
/// Does the same work as `key_changes_proof`, but assumes that CHTs are of passed size.
pub fn key_changes_proof_with_cht_size(
&self,
first: Block::Hash,
last: Block::Hash,
min: Block::Hash,
max: Block::Hash,
key: &StorageKey,
cht_size: NumberFor,
) -> error::Result> {
struct AccessedRootsRecorder<'a, Block: BlockT> {
storage: &'a ChangesTrieStorage>,
min: NumberFor,
required_roots_proofs: Mutex, H256>>,
};
impl<'a, Block: BlockT> ChangesTrieRootsStorage> for AccessedRootsRecorder<'a, Block> {
fn build_anchor(&self, hash: H256) -> Result>, String> {
self.storage.build_anchor(hash)
}
fn root(
&self,
anchor: &ChangesTrieAnchorBlockId>,
block: NumberFor,
) -> Result, String> {
let root = self.storage.root(anchor, block)?;
if block < self.min {
if let Some(ref root) = root {
self.required_roots_proofs.lock().insert(
block,
root.clone()
);
}
}
Ok(root)
}
}
impl<'a, Block: BlockT> ChangesTrieStorage> for AccessedRootsRecorder<'a, Block> {
fn get(&self, key: &H256, prefix: &[u8]) -> Result, String> {
self.storage.get(key, prefix)
}
}
let (config, storage) = self.require_changes_trie()?;
let min_number = self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(min))?;
let recording_storage = AccessedRootsRecorder:: {
storage,
min: min_number,
required_roots_proofs: Mutex::new(BTreeMap::new()),
};
let max_number = ::std::cmp::min(
self.backend.blockchain().info()?.best_number,
self.backend.blockchain().expect_block_number_from_id(&BlockId::Hash(max))?,
);
// fetch key changes proof
let first_number = self.backend.blockchain()
.expect_block_number_from_id(&BlockId::Hash(first))?;
let last_number = self.backend.blockchain()
.expect_block_number_from_id(&BlockId::Hash(last))?;
let key_changes_proof = key_changes_proof::<_, Blake2Hasher, _>(
&config,
&recording_storage,
first_number,
&ChangesTrieAnchorBlockId {
hash: convert_hash(&last),
number: last_number,
},
max_number,
&key.0
)
.map_err(|err| error::Error::from(error::Error::ChangesTrieAccessFailed(err)))?;
// now gather proofs for all changes tries roots that were touched during key_changes_proof
// execution AND are unknown (i.e. replaced with CHT) to the requester
let roots = recording_storage.required_roots_proofs.into_inner();
let roots_proof = self.changes_trie_roots_proof(cht_size, roots.keys().cloned())?;
Ok(ChangesProof {
max_block: max_number,
proof: key_changes_proof,
roots: roots.into_iter().map(|(n, h)| (n, convert_hash(&h))).collect(),
roots_proof,
})
}
/// Generate CHT-based proof for roots of changes tries at given blocks.
fn changes_trie_roots_proof>>(
&self,
cht_size: NumberFor,
blocks: I
) -> error::Result>> {
// most probably we have touched several changes tries that are parts of the single CHT
// => GroupBy changes tries by CHT number and then gather proof for the whole group at once
let mut proof = HashSet::new();
cht::for_each_cht_group::(cht_size, blocks, |_, cht_num, cht_blocks| {
let cht_proof = self.changes_trie_roots_proof_at_cht(cht_size, cht_num, cht_blocks)?;
proof.extend(cht_proof);
Ok(())
}, ())?;
Ok(proof.into_iter().collect())
}
/// Generates CHT-based proof for roots of changes tries at given blocks (that are part of single CHT).
fn changes_trie_roots_proof_at_cht(
&self,
cht_size: NumberFor,
cht_num: NumberFor,
blocks: Vec>
) -> error::Result>> {
let cht_start = cht::start_number(cht_size, cht_num);
let mut current_num = cht_start;
let cht_range = ::std::iter::from_fn(|| {
let old_current_num = current_num;
current_num = current_num + One::one();
Some(old_current_num)
});
let roots = cht_range
.map(|num| self.header(&BlockId::Number(num))
.map(|block| block.and_then(|block| block.digest().log(DigestItem::as_changes_trie_root).cloned())));
let proof = cht::build_proof::(cht_size, cht_num, blocks, roots)?;
Ok(proof)
}
/// Returns changes trie configuration and storage or an error if it is not supported.
fn require_changes_trie(&self) -> error::Result<(ChangesTrieConfiguration, &B::ChangesTrieStorage)> {
let config = self.changes_trie_config()?;
let storage = self.backend.changes_trie_storage();
match (config, storage) {
(Some(config), Some(storage)) => Ok((config, storage)),
_ => Err(error::Error::ChangesTriesNotSupported.into()),
}
}
/// Create a new block, built on the head of the chain.
pub fn new_block(
&self
) -> error::Result> where
E: Clone + Send + Sync,
RA: Send + Sync,
Self: ProvideRuntimeApi,
::Api: BlockBuilderAPI
{
block_builder::BlockBuilder::new(self)
}
/// Create a new block, built on top of `parent`.
pub fn new_block_at(
&self, parent: &BlockId
) -> error::Result> where
E: Clone + Send + Sync,
RA: Send + Sync,
Self: ProvideRuntimeApi,
::Api: BlockBuilderAPI
{
block_builder::BlockBuilder::at_block(parent, &self, false)
}
/// Create a new block, built on top of `parent` with proof recording enabled.
///
/// While proof recording is enabled, all accessed trie nodes are saved.
/// These recorded trie nodes can be used by a third party to proof the
/// output of this block builder without having access to the full storage.
pub fn new_block_at_with_proof_recording(
&self, parent: &BlockId
) -> error::Result> where
E: Clone + Send + Sync,
RA: Send + Sync,
Self: ProvideRuntimeApi,
::Api: BlockBuilderAPI
{
block_builder::BlockBuilder::at_block(parent, &self, true)
}
/// Lock the import lock, and run operations inside.
pub fn lock_import_and_run(&self, f: F) -> Result where
F: FnOnce(&mut ClientImportOperation) -> Result,
Err: From,
{
let inner = || {
let _import_lock = self.import_lock.lock();
let mut op = ClientImportOperation {
op: self.backend.begin_operation()?,
notify_imported: None,
notify_finalized: Vec::new(),
};
let r = f(&mut op)?;
let ClientImportOperation { op, notify_imported, notify_finalized } = op;
self.backend.commit_operation(op)?;
self.notify_finalized(notify_finalized)?;
if let Some(notify_imported) = notify_imported {
self.notify_imported(notify_imported)?;
}
Ok(r)
};
let result = inner();
*self.importing_block.write() = None;
result
}
/// Set a block as best block.
pub fn set_head(
&self,
id: BlockId
) -> error::Result<()> {
self.lock_import_and_run(|operation| {
self.apply_head(operation, id)
})
}
/// Set a block as best block, and apply it to an operation.
pub fn apply_head(
&self,
operation: &mut ClientImportOperation,
id: BlockId,
) -> error::Result<()> {
operation.op.mark_head(id)
}
/// Apply a checked and validated block to an operation. If a justification is provided
/// then `finalized` *must* be true.
pub fn apply_block(
&self,
operation: &mut ClientImportOperation,
import_block: ImportBlock,
new_cache: HashMap>,
) -> error::Result where
E: CallExecutor + Send + Sync + Clone,
{
let ImportBlock {
origin,
header,
justification,
post_digests,
body,
finalized,
auxiliary,
fork_choice,
} = import_block;
assert!(justification.is_some() && finalized || justification.is_none());
let parent_hash = header.parent_hash().clone();
match self.backend.blockchain().status(BlockId::Hash(parent_hash))? {
blockchain::BlockStatus::InChain => {},
blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
}
let import_headers = if post_digests.is_empty() {
PrePostHeader::Same(header)
} else {
let mut post_header = header.clone();
for item in post_digests {
post_header.digest_mut().push(item);
}
PrePostHeader::Different(header, post_header)
};
let hash = import_headers.post().hash();
let height = (*import_headers.post().number()).saturated_into::();
*self.importing_block.write() = Some(hash);
let result = self.execute_and_import_block(
operation,
origin,
hash,
import_headers,
justification,
body,
new_cache,
finalized,
auxiliary,
fork_choice,
);
telemetry!(SUBSTRATE_INFO; "block.import";
"height" => height,
"best" => ?hash,
"origin" => ?origin
);
result
}
fn execute_and_import_block(
&self,
operation: &mut ClientImportOperation,
origin: BlockOrigin,
hash: Block::Hash,
import_headers: PrePostHeader,
justification: Option,
body: Option>,
new_cache: HashMap>,
finalized: bool,
aux: Vec<(Vec, Option>)>,
fork_choice: ForkChoiceStrategy,
) -> error::Result where
E: CallExecutor + Send + Sync + Clone,
{
let parent_hash = import_headers.post().parent_hash().clone();
match self.backend.blockchain().status(BlockId::Hash(hash))? {
blockchain::BlockStatus::InChain => return Ok(ImportResult::AlreadyInChain),
blockchain::BlockStatus::Unknown => {},
}
let (last_best, last_best_number) = {
let info = self.backend.blockchain().info()?;
(info.best_hash, info.best_number)
};
// this is a fairly arbitrary choice of where to draw the line on making notifications,
// but the general goal is to only make notifications when we are already fully synced
// and get a new chain head.
let make_notifications = match origin {
BlockOrigin::NetworkBroadcast | BlockOrigin::Own | BlockOrigin::ConsensusBroadcast => true,
BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File => false,
};
self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?;
// ensure parent block is finalized to maintain invariant that
// finality is called sequentially.
if finalized {
self.apply_finality_with_block_hash(operation, parent_hash, None, last_best, make_notifications)?;
}
// FIXME #1232: correct path logic for when to execute this function
let (storage_update,changes_update,storage_changes) = self.block_execution(&operation.op, &import_headers, origin, hash, body.clone())?;
let is_new_best = finalized || match fork_choice {
ForkChoiceStrategy::LongestChain => import_headers.post().number() > &last_best_number,
ForkChoiceStrategy::Custom(v) => v,
};
let leaf_state = if finalized {
crate::backend::NewBlockState::Final
} else if is_new_best {
crate::backend::NewBlockState::Best
} else {
crate::backend::NewBlockState::Normal
};
trace!("Imported {}, (#{}), best={}, origin={:?}", hash, import_headers.post().number(), is_new_best, origin);
operation.op.set_block_data(
import_headers.post().clone(),
body,
justification,
leaf_state,
)?;
operation.op.update_cache(new_cache);
if let Some(storage_update) = storage_update {
operation.op.update_db_storage(storage_update)?;
}
if let Some(storage_changes) = storage_changes.clone() {
operation.op.update_storage(storage_changes)?;
}
if let Some(Some(changes_update)) = changes_update {
operation.op.update_changes_trie(changes_update)?;
}
operation.op.insert_aux(aux)?;
if make_notifications {
if finalized {
operation.notify_finalized.push(hash);
}
operation.notify_imported = Some((hash, origin, import_headers.into_post(), is_new_best, storage_changes));
}
Ok(ImportResult::imported())
}
fn block_execution(
&self,
transaction: &B::BlockImportOperation,
import_headers: &PrePostHeader,
origin: BlockOrigin,
hash: Block::Hash,
body: Option>,
) -> error::Result<(
Option>,
Option>,
Option, Option>)>>,
)>
where
E: CallExecutor + Send + Sync + Clone,
{
match transaction.state()? {
Some(transaction_state) => {
let mut overlay = Default::default();
let get_execution_manager = |execution_strategy: ExecutionStrategy| {
match execution_strategy {
ExecutionStrategy::NativeElseWasm => ExecutionManager::NativeElseWasm,
ExecutionStrategy::AlwaysWasm => ExecutionManager::AlwaysWasm,
ExecutionStrategy::NativeWhenPossible => ExecutionManager::NativeWhenPossible,
ExecutionStrategy::Both => ExecutionManager::Both(|wasm_result, native_result| {
let header = import_headers.post();
warn!("Consensus error between wasm and native block execution at block {}", hash);
warn!(" Header {:?}", header);
warn!(" Native result {:?}", native_result);
warn!(" Wasm result {:?}", wasm_result);
telemetry!(SUBSTRATE_INFO; "block.execute.consensus_failure";
"hash" => ?hash,
"origin" => ?origin,
"header" => ?header
);
wasm_result
}),
}
};
let (_, storage_update, changes_update) = self.executor.call_at_state::<_, _, _, NeverNativeValue, fn() -> _>(
transaction_state,
&mut overlay,
"Core_execute_block",
&::new(import_headers.pre().clone(), body.unwrap_or_default()).encode(),
match origin {
BlockOrigin::NetworkInitialSync => get_execution_manager(self.execution_strategies().syncing),
_ => get_execution_manager(self.execution_strategies().importing),
},
None,
NeverOffchainExt::new(),
)?;
overlay.commit_prospective();
Ok((Some(storage_update), Some(changes_update), Some(overlay.into_committed().collect())))
},
None => Ok((None, None, None))
}
}
fn apply_finality_with_block_hash(
&self,
operation: &mut ClientImportOperation,
block: Block::Hash,
justification: Option,
best_block: Block::Hash,
notify: bool,
) -> error::Result<()> {
// find tree route from last finalized to given block.
let last_finalized = self.backend.blockchain().last_finalized()?;
if block == last_finalized {
warn!("Possible safety violation: attempted to re-finalize last finalized block {:?} ", last_finalized);
return Ok(());
}
let route_from_finalized = crate::blockchain::tree_route(
self.backend.blockchain(),
BlockId::Hash(last_finalized),
BlockId::Hash(block),
)?;
if let Some(retracted) = route_from_finalized.retracted().get(0) {
warn!("Safety violation: attempted to revert finalized block {:?} which is not in the \
same chain as last finalized {:?}", retracted, last_finalized);
return Err(error::Error::NotInFinalizedChain);
}
let route_from_best = crate::blockchain::tree_route(
self.backend.blockchain(),
BlockId::Hash(best_block),
BlockId::Hash(block),
)?;
// if the block is not a direct ancestor of the current best chain,
// then some other block is the common ancestor.
if route_from_best.common_block().hash != block {
// FIXME: #1442 reorganize best block to be the best chain containing
// `block`.
}
let enacted = route_from_finalized.enacted();
assert!(enacted.len() > 0);
for finalize_new in &enacted[..enacted.len() - 1] {
operation.op.mark_finalized(BlockId::Hash(finalize_new.hash), None)?;
}
assert_eq!(enacted.last().map(|e| e.hash), Some(block));
operation.op.mark_finalized(BlockId::Hash(block), justification)?;
if notify {
// sometimes when syncing, tons of blocks can be finalized at once.
// we'll send notifications spuriously in that case.
const MAX_TO_NOTIFY: usize = 256;
let enacted = route_from_finalized.enacted();
let start = enacted.len() - ::std::cmp::min(enacted.len(), MAX_TO_NOTIFY);
for finalized in &enacted[start..] {
operation.notify_finalized.push(finalized.hash);
}
}
Ok(())
}
fn notify_finalized(
&self,
notify_finalized: Vec,
) -> error::Result<()> {
let mut sinks = self.finality_notification_sinks.lock();
for finalized_hash in notify_finalized {
let header = self.header(&BlockId::Hash(finalized_hash))?
.expect("header already known to exist in DB because it is indicated in the tree route; qed");
telemetry!(SUBSTRATE_INFO; "notify.finalized";
"height" => format!("{}", header.number()),
"best" => ?finalized_hash,
);
let notification = FinalityNotification {
header,
hash: finalized_hash,
};
sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
}
Ok(())
}
fn notify_imported(
&self,
notify_import: (Block::Hash, BlockOrigin, Block::Header, bool, Option, Option>)>>),
) -> error::Result<()> {
let (hash, origin, header, is_new_best, storage_changes) = notify_import;
if let Some(storage_changes) = storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock()
.trigger(&hash, storage_changes.into_iter());
}
let notification = BlockImportNotification:: {
hash,
origin,
header,
is_new_best,
};
self.import_notification_sinks.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
Ok(())
}
/// Apply auxiliary data insertion into an operation.
pub fn apply_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator- ,
D: IntoIterator
- ,
>(
&self,
operation: &mut ClientImportOperation
,
insert: I,
delete: D
) -> error::Result<()> {
operation.op.insert_aux(
insert.into_iter()
.map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
.chain(delete.into_iter().map(|k| (k.to_vec(), None)))
)
}
/// Mark all blocks up to given as finalized in operation. If a
/// justification is provided it is stored with the given finalized
/// block (any other finalized blocks are left unjustified).
pub fn apply_finality(
&self,
operation: &mut ClientImportOperation,
id: BlockId,
justification: Option,
notify: bool,
) -> error::Result<()> {
let last_best = self.backend.blockchain().info()?.best_hash;
let to_finalize_hash = self.backend.blockchain().expect_block_hash_from_id(&id)?;
self.apply_finality_with_block_hash(operation, to_finalize_hash, justification, last_best, notify)
}
/// Finalize a block. This will implicitly finalize all blocks up to it and
/// fire finality notifications.
///
/// Pass a flag to indicate whether finality notifications should be propagated.
/// This is usually tied to some synchronization state, where we don't send notifications
/// while performing major synchronization work.
pub fn finalize_block(&self, id: BlockId, justification: Option, notify: bool) -> error::Result<()> {
self.lock_import_and_run(|operation| {
let last_best = self.backend.blockchain().info()?.best_hash;
let to_finalize_hash = self.backend.blockchain().expect_block_hash_from_id(&id)?;
self.apply_finality_with_block_hash(operation, to_finalize_hash, justification, last_best, notify)
})
}
/// Attempts to revert the chain by `n` blocks. Returns the number of blocks that were
/// successfully reverted.
pub fn revert(&self, n: NumberFor) -> error::Result> {
Ok(self.backend.revert(n)?)
}
/// Get blockchain info.
pub fn info(&self) -> error::Result> {
let info = self.backend.blockchain().info().map_err(|e| error::Error::from_blockchain(Box::new(e)))?;
Ok(ClientInfo {
chain: info,
best_queued_hash: None,
best_queued_number: None,
})
}
/// Get block status.
pub fn block_status(&self, id: &BlockId) -> error::Result {
// this can probably be implemented more efficiently
if let BlockId::Hash(ref h) = id {
if self.importing_block.read().as_ref().map_or(false, |importing| h == importing) {
return Ok(BlockStatus::Queued);
}
}
let hash_and_number = match id.clone() {
BlockId::Hash(hash) => self.backend.blockchain().number(hash)?.map(|n| (hash, n)),
BlockId::Number(n) => self.backend.blockchain().hash(n)?.map(|hash| (hash, n)),
};
match hash_and_number {
Some((hash, number)) => {
if self.backend.have_state_at(&hash, number) {
Ok(BlockStatus::InChainWithState)
} else {
Ok(BlockStatus::InChainPruned)
}
}
None => Ok(BlockStatus::Unknown),
}
}
/// Get block header by id.
pub fn header(&self, id: &BlockId) -> error::Result::Header>> {
self.backend.blockchain().header(*id)
}
/// Get block body by id.
pub fn body(&self, id: &BlockId) -> error::Result::Extrinsic>>> {
self.backend.blockchain().body(*id)
}
/// Get block justification set by id.
pub fn justification(&self, id: &BlockId) -> error::Result> {
self.backend.blockchain().justification(*id)
}
/// Get full block by id.
pub fn block(&self, id: &BlockId)
-> error::Result>>
{
Ok(match (self.header(id)?, self.body(id)?, self.justification(id)?) {
(Some(header), Some(extrinsics), justification) =>
Some(SignedBlock { block: Block::new(header, extrinsics), justification }),
_ => None,
})
}
/// Gets the uncles of the block with `target_hash` going back `max_generation` ancestors.
pub fn uncles(&self, target_hash: Block::Hash, max_generation: NumberFor) -> error::Result> {
let load_header = |id: Block::Hash| -> error::Result {
match self.backend.blockchain().header(BlockId::Hash(id))? {
Some(hdr) => Ok(hdr),
None => Err(Error::UnknownBlock(format!("Unknown block {:?}", id))),
}
};
let genesis_hash = self.backend.blockchain().info()?.genesis_hash;
if genesis_hash == target_hash { return Ok(Vec::new()); }
let mut current_hash = target_hash;
let mut current = load_header(current_hash)?;
let mut ancestor_hash = *current.parent_hash();
let mut ancestor = load_header(ancestor_hash)?;
let mut uncles = Vec::new();
for _generation in 0..max_generation.saturated_into() {
let children = self.backend.blockchain().children(ancestor_hash)?;
uncles.extend(children.into_iter().filter(|h| h != ¤t_hash));
current_hash = ancestor_hash;
if genesis_hash == current_hash { break; }
current = ancestor;
ancestor_hash = *current.parent_hash();
ancestor = load_header(ancestor_hash)?;
}
Ok(uncles)
}
fn changes_trie_config(&self) -> Result, Error> {
Ok(self.backend.state_at(BlockId::Number(self.backend.blockchain().info()?.best_number))?
.storage(well_known_keys::CHANGES_TRIE_CONFIG)
.map_err(|e| error::Error::from_state(Box::new(e)))?
.and_then(|c| Decode::decode(&mut &*c)))
}
/// Prepare in-memory header that is used in execution environment.
fn prepare_environment_block(&self, parent: &BlockId) -> error::Result {
Ok(<::Header as HeaderT>::new(
self.backend.blockchain().expect_block_number_from_id(parent)? + One::one(),
Default::default(),
Default::default(),
self.backend.blockchain().expect_block_hash_from_id(&parent)?,
Default::default(),
))
}
}
impl ChainHeaderBackend for Client where
B: backend::Backend,
E: CallExecutor + Send + Sync,
Block: BlockT,
RA: Send + Sync
{
fn header(&self, id: BlockId) -> error::Result> {
self.backend.blockchain().header(id)
}
fn info(&self) -> error::Result> {
self.backend.blockchain().info()
}
fn status(&self, id: BlockId) -> error::Result {
self.backend.blockchain().status(id)
}
fn number(&self, hash: Block::Hash) -> error::Result::Header as HeaderT>::Number>> {
self.backend.blockchain().number(hash)
}
fn hash(&self, number: NumberFor) -> error::Result> {
self.backend.blockchain().hash(number)
}
}
impl ProvideCache for Client where
B: backend::Backend,
Block: BlockT,
{
fn cache(&self) -> Option>> {
self.backend.blockchain().cache()
}
}
impl ProvideRuntimeApi for Client where
B: backend::Backend,
E: CallExecutor + Clone + Send + Sync,
Block: BlockT,
RA: ConstructRuntimeApi
{
type Api = >::RuntimeApi;
fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> {
RA::construct_runtime_api(self)
}
}
impl CallRuntimeAt for Client where
B: backend::Backend,
E: CallExecutor + Clone + Send + Sync,
Block: BlockT,
{
fn call_api_at<
'a,
R: Encode + Decode + PartialEq,
NC: FnOnce() -> result::Result + UnwindSafe,
C: CoreApi,
>(
&self,
core_api: &C,
at: &BlockId,
function: &'static str,
args: Vec,
changes: &RefCell,
initialize_block: InitializeBlock<'a, Block>,
native_call: Option,
context: ExecutionContext,
recorder: &Option>>>,
) -> error::Result> {
let manager = match context {
ExecutionContext::BlockConstruction =>
self.execution_strategies.block_construction.get_manager(),
ExecutionContext::Syncing =>
self.execution_strategies.syncing.get_manager(),
ExecutionContext::Importing =>
self.execution_strategies.importing.get_manager(),
ExecutionContext::OffchainWorker(_) =>
self.execution_strategies.offchain_worker.get_manager(),
ExecutionContext::Other =>
self.execution_strategies.other.get_manager(),
};
let mut offchain_extensions = match context {
ExecutionContext::OffchainWorker(ext) => Some(ext),
_ => None,
};
self.executor.contextual_call::<_, _, fn(_,_) -> _,_,_>(
|| core_api.initialize_block(at, &self.prepare_environment_block(at)?),
at,
function,
&args,
changes,
initialize_block,
manager,
native_call,
offchain_extensions.as_mut(),
recorder,
)
}
fn runtime_version_at(&self, at: &BlockId) -> error::Result {
self.runtime_version_at(at)
}
}
impl consensus::BlockImport for Client where
B: backend::Backend,
E: CallExecutor + Clone + Send + Sync,
Block: BlockT,
{
type Error = ConsensusError;
/// Import a checked and validated block. If a justification is provided in
/// `ImportBlock` then `finalized` *must* be true.
fn import_block(
&self,
import_block: ImportBlock,
new_cache: HashMap>,
) -> Result {
self.lock_import_and_run(|operation| {
self.apply_block(operation, import_block, new_cache)
}).map_err(|e| ConsensusError::ClientImport(e.to_string()).into())
}
/// Check block preconditions.
fn check_block(
&self,
hash: Block::Hash,
parent_hash: Block::Hash,
) -> Result {
match self.block_status(&BlockId::Hash(parent_hash))
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued => {},
BlockStatus::Unknown | BlockStatus::InChainPruned => return Ok(ImportResult::UnknownParent),
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}
match self.block_status(&BlockId::Hash(hash))
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued => return Ok(ImportResult::AlreadyInChain),
BlockStatus::Unknown | BlockStatus::InChainPruned => {},
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}
Ok(ImportResult::imported())
}
}
impl CurrentHeight for Client where
B: backend::Backend,
E: CallExecutor,
Block: BlockT,
{
type BlockNumber =