From cca9ab436c2baa0da7bea5dcbf261b1b929cdf02 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan <arkady.paronyan@gmail.com> Date: Thu, 7 Nov 2019 15:25:41 +0100 Subject: [PATCH] Allow import withouth state verification (#4031) * Allow import without state verification * Explicit None Co-Authored-By: Robert Habermeier <rphmeier@gmail.com> --- substrate/core/client/db/src/lib.rs | 63 ++++++++++---- substrate/core/client/src/client.rs | 82 +++++++++++-------- substrate/core/consensus/aura/src/lib.rs | 2 + substrate/core/consensus/babe/src/lib.rs | 2 + substrate/core/consensus/babe/src/tests.rs | 1 + .../core/consensus/common/src/block_import.rs | 8 ++ .../core/consensus/common/src/import_queue.rs | 16 +++- substrate/core/consensus/pow/src/lib.rs | 2 + .../core/finality-grandpa/src/light_import.rs | 5 ++ substrate/core/finality-grandpa/src/tests.rs | 4 + substrate/core/network/src/protocol/sync.rs | 45 ++++++---- .../core/network/src/test/block_import.rs | 7 +- substrate/core/network/src/test/mod.rs | 26 +++--- substrate/core/network/src/test/sync.rs | 43 +++++++++- substrate/core/service/src/chain_ops.rs | 1 + substrate/core/test-client/src/client_ext.rs | 3 + substrate/core/test-client/src/lib.rs | 6 ++ substrate/node-template/build.rs | 2 - substrate/node/cli/src/service.rs | 1 + .../test-utils/transaction-factory/src/lib.rs | 1 + 20 files changed, 232 insertions(+), 88 deletions(-) diff --git a/substrate/core/client/db/src/lib.rs b/substrate/core/client/db/src/lib.rs index 8bd00019816..5902afff0de 100644 --- a/substrate/core/client/db/src/lib.rs +++ b/substrate/core/client/db/src/lib.rs @@ -457,6 +457,7 @@ pub struct BlockImportOperation<Block: BlockT, H: Hasher> { aux_ops: Vec<(Vec<u8>, Option<Vec<u8>>)>, finalized_blocks: Vec<(BlockId<Block>, Option<Justification>)>, set_head: Option<BlockId<Block>>, + commit_state: bool, } impl<Block: BlockT, H: Hasher> BlockImportOperation<Block, H> { @@ -531,6 +532,7 @@ impl<Block> client::backend::BlockImportOperation<Block, Blake2Hasher> ); self.db_updates = transaction; + self.commit_state = true; Ok(root) } @@ -783,6 +785,7 @@ pub struct Backend<Block: BlockT> { canonicalization_delay: u64, shared_cache: SharedCache<Block, Blake2Hasher>, import_lock: Mutex<()>, + is_archive: bool, } impl<Block: BlockT<Hash=H256>> Backend<Block> { @@ -843,6 +846,7 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> { config.state_cache_child_ratio.unwrap_or(DEFAULT_CHILD_RATIO), ), import_lock: Default::default(), + is_archive: is_archive_pruning, }) } @@ -894,6 +898,12 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> { inmem } + /// Returns total numbet of blocks (headers) in the block DB. + #[cfg(feature = "test-helpers")] + pub fn blocks_count(&self) -> u64 { + self.blockchain.db.iter(columns::HEADER).count() as u64 + } + /// Read (from storage or cache) changes trie config. /// /// Currently changes tries configuration is set up once (at genesis) and could not @@ -1115,7 +1125,7 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> { ); transaction.put(columns::HEADER, &lookup_key, &pending_block.header.encode()); - if let Some(body) = pending_block.body { + if let Some(body) = &pending_block.body { transaction.put(columns::BODY, &lookup_key, &body.encode()); } if let Some(justification) = pending_block.justification { @@ -1127,21 +1137,26 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> { transaction.put(columns::META, meta_keys::GENESIS_HASH, hash.as_ref()); } - let mut changeset: state_db::ChangeSet<Vec<u8>> = state_db::ChangeSet::default(); - for (key, (val, rc)) in operation.db_updates.drain() { - if rc > 0 { - changeset.inserted.push((key, val.to_vec())); - } else if rc < 0 { - changeset.deleted.push(key); + let finalized = if operation.commit_state { + let mut changeset: state_db::ChangeSet<Vec<u8>> = state_db::ChangeSet::default(); + for (key, (val, rc)) in operation.db_updates.drain() { + if rc > 0 { + changeset.inserted.push((key, val.to_vec())); + } else if rc < 0 { + changeset.deleted.push(key); + } } - } - let number_u64 = number.saturated_into::<u64>(); - let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset) - .map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?; - apply_state_commit(&mut transaction, commit); - - // Check if need to finalize. Genesis is always finalized instantly. - let finalized = number_u64 == 0 || pending_block.leaf_state.is_final(); + let number_u64 = number.saturated_into::<u64>(); + let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset) + .map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?; + apply_state_commit(&mut transaction, commit); + + // Check if need to finalize. Genesis is always finalized instantly. + let finalized = number_u64 == 0 || pending_block.leaf_state.is_final(); + finalized + } else { + false + }; let header = &pending_block.header; let is_best = pending_block.leaf_state.is_best(); @@ -1347,6 +1362,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe aux_ops: Vec::new(), finalized_blocks: Vec::new(), set_head: None, + commit_state: false, }) } @@ -1356,6 +1372,7 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe block: BlockId<Block>, ) -> ClientResult<()> { operation.old_state = self.state_at(block)?; + operation.commit_state = true; Ok(()) } @@ -1478,6 +1495,9 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe match self.blockchain.header(block) { Ok(Some(ref hdr)) => { let hash = hdr.hash(); + if !self.have_state_at(&hash, *hdr.number()) { + return Err(client::error::Error::UnknownBlock(format!("State already discarded for {:?}", block))) + } if let Ok(()) = self.storage.state_db.pin(&hash) { let root = H256::from_slice(hdr.state_root().as_ref()); let db_state = DbState::new(self.storage.clone(), root); @@ -1493,7 +1513,16 @@ impl<Block> client::backend::Backend<Block, Blake2Hasher> for Backend<Block> whe } fn have_state_at(&self, hash: &Block::Hash, number: NumberFor<Block>) -> bool { - !self.storage.state_db.is_pruned(hash, number.saturated_into::<u64>()) + if self.is_archive { + match self.blockchain.header(BlockId::Hash(hash.clone())) { + Ok(Some(header)) => { + state_machine::Storage::get(self.storage.as_ref(), &header.state_root(), (&[], None)).unwrap_or(None).is_some() + }, + _ => false, + } + } else { + !self.storage.state_db.is_pruned(hash, number.saturated_into::<u64>()) + } } fn destroy_state(&self, state: Self::State) -> ClientResult<()> { @@ -1581,7 +1610,7 @@ mod tests { }; let mut op = backend.begin_operation().unwrap(); backend.begin_state_operation(&mut op, block_id).unwrap(); - op.set_block_data(header, None, None, NewBlockState::Best).unwrap(); + op.set_block_data(header, Some(Vec::new()), None, NewBlockState::Best).unwrap(); op.update_changes_trie((changes_trie_update, ChangesTrieCacheAction::Clear)).unwrap(); backend.commit_operation(op).unwrap(); diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 71d6e4f01d6..24a352ef641 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -838,15 +838,22 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where finalized, auxiliary, fork_choice, + allow_missing_state, } = import_block; assert!(justification.is_some() && finalized || justification.is_none()); let parent_hash = header.parent_hash().clone(); + let mut enact_state = true; - match self.backend.blockchain().status(BlockId::Hash(parent_hash))? { - blockchain::BlockStatus::InChain => {}, - blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), + match self.block_status(&BlockId::Hash(parent_hash))? { + BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), + BlockStatus::InChainWithState | BlockStatus::Queued => {}, + BlockStatus::InChainPruned if allow_missing_state => { + enact_state = false; + }, + BlockStatus::InChainPruned => return Ok(ImportResult::MissingState), + BlockStatus::KnownBad => return Ok(ImportResult::KnownBad), } let import_headers = if post_digests.is_empty() { @@ -875,6 +882,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where finalized, auxiliary, fork_choice, + enact_state, ); if let Ok(ImportResult::Imported(ref aux)) = result { @@ -902,6 +910,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where finalized: bool, aux: Vec<(Vec<u8>, Option<Vec<u8>>)>, fork_choice: ForkChoiceStrategy, + enact_state: bool, ) -> error::Result<ImportResult> where E: CallExecutor<Block, Blake2Hasher> + Send + Sync + Clone, { @@ -927,22 +936,39 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File => false, }; - self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?; + let storage_changes = match &body { + Some(body) if enact_state => { + self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?; - // ensure parent block is finalized to maintain invariant that - // finality is called sequentially. - if finalized { - self.apply_finality_with_block_hash(operation, parent_hash, None, info.best_hash, make_notifications)?; - } + // ensure parent block is finalized to maintain invariant that + // finality is called sequentially. + if finalized { + self.apply_finality_with_block_hash(operation, parent_hash, None, info.best_hash, make_notifications)?; + } - // FIXME #1232: correct path logic for when to execute this function - let (storage_update, changes_update, storage_changes) = self.block_execution( - &operation.op, - &import_headers, - origin, - hash, - body.clone(), - )?; + // FIXME #1232: correct path logic for when to execute this function + let (storage_update, changes_update, storage_changes) = self.block_execution( + &operation.op, + &import_headers, + origin, + hash, + &body, + )?; + + operation.op.update_cache(new_cache); + if let Some(storage_update) = storage_update { + operation.op.update_db_storage(storage_update)?; + } + if let Some(storage_changes) = storage_changes.clone() { + operation.op.update_storage(storage_changes.0, storage_changes.1)?; + } + if let Some(Some(changes_update)) = changes_update { + operation.op.update_changes_trie(changes_update)?; + } + storage_changes + }, + _ => None, + }; let is_new_best = finalized || match fork_choice { ForkChoiceStrategy::LongestChain => import_headers.post().number() > &info.best_number, @@ -977,17 +1003,6 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where leaf_state, )?; - operation.op.update_cache(new_cache); - if let Some(storage_update) = storage_update { - operation.op.update_db_storage(storage_update)?; - } - if let Some(storage_changes) = storage_changes.clone() { - operation.op.update_storage(storage_changes.0, storage_changes.1)?; - } - if let Some(Some(changes_update)) = changes_update { - operation.op.update_changes_trie(changes_update)?; - } - operation.op.insert_aux(aux)?; if make_notifications { @@ -1014,7 +1029,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where import_headers: &PrePostHeader<Block::Header>, origin: BlockOrigin, hash: Block::Hash, - body: Option<Vec<Block::Extrinsic>>, + body: &[Block::Extrinsic], ) -> error::Result<( Option<StorageUpdate<B, Block>>, Option<Option<ChangesUpdate<Block>>>, @@ -1052,7 +1067,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where let encoded_block = <Block as BlockT>::encode_from( import_headers.pre(), - &body.unwrap_or_default() + body, ); let (_, storage_update, changes_update) = self.executor @@ -1523,7 +1538,7 @@ impl<'a, B, E, Block, RA> consensus::BlockImport<Block> for &'a Client<B, E, Blo &mut self, block: BlockCheckParams<Block>, ) -> Result<ImportResult, Self::Error> { - let BlockCheckParams { hash, number, parent_hash } = block; + let BlockCheckParams { hash, number, parent_hash, allow_missing_state } = block; if let Some(h) = self.fork_blocks.as_ref().and_then(|x| x.get(&number)) { if &hash != h { @@ -1541,7 +1556,9 @@ impl<'a, B, E, Block, RA> consensus::BlockImport<Block> for &'a Client<B, E, Blo .map_err(|e| ConsensusError::ClientImport(e.to_string()))? { BlockStatus::InChainWithState | BlockStatus::Queued => {}, - BlockStatus::Unknown | BlockStatus::InChainPruned => return Ok(ImportResult::UnknownParent), + BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), + BlockStatus::InChainPruned if allow_missing_state => {}, + BlockStatus::InChainPruned => return Ok(ImportResult::MissingState), BlockStatus::KnownBad => return Ok(ImportResult::KnownBad), } @@ -1553,7 +1570,6 @@ impl<'a, B, E, Block, RA> consensus::BlockImport<Block> for &'a Client<B, E, Blo BlockStatus::KnownBad => return Ok(ImportResult::KnownBad), } - Ok(ImportResult::imported(false)) } } diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index eb2d1dba20f..008ad68f810 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -267,6 +267,7 @@ impl<H, B, C, E, I, P, Error, SO> slots::SimpleSlotWorker<B> for AuraWorker<C, E finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, } }) } @@ -570,6 +571,7 @@ impl<B: BlockT, C, P, T> Verifier<B> for AuraVerifier<C, P, T> where justification, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, }; Ok((block_import_params, maybe_keys)) diff --git a/substrate/core/consensus/babe/src/lib.rs b/substrate/core/consensus/babe/src/lib.rs index 287b26a300f..801b47d0ec5 100644 --- a/substrate/core/consensus/babe/src/lib.rs +++ b/substrate/core/consensus/babe/src/lib.rs @@ -430,6 +430,7 @@ impl<B, C, E, I, Error, SO> slots::SimpleSlotWorker<B> for BabeWorker<B, C, E, I // option to specify one. // https://github.com/paritytech/substrate/issues/3623 fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, } }) } @@ -741,6 +742,7 @@ impl<B, E, Block, RA, PRA> Verifier<Block> for BabeVerifier<B, E, Block, RA, PRA // option to specify one. // https://github.com/paritytech/substrate/issues/3623 fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, }; Ok((block_import_params, Default::default())) diff --git a/substrate/core/consensus/babe/src/tests.rs b/substrate/core/consensus/babe/src/tests.rs index 2e236f190b6..6db4a7284e3 100644 --- a/substrate/core/consensus/babe/src/tests.rs +++ b/substrate/core/consensus/babe/src/tests.rs @@ -578,6 +578,7 @@ fn propose_and_import_block( finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, }, Default::default(), ).unwrap(); diff --git a/substrate/core/consensus/common/src/block_import.rs b/substrate/core/consensus/common/src/block_import.rs index 4342ee38df1..14450841a32 100644 --- a/substrate/core/consensus/common/src/block_import.rs +++ b/substrate/core/consensus/common/src/block_import.rs @@ -35,11 +35,15 @@ pub enum ImportResult { KnownBad, /// Block parent is not in the chain. UnknownParent, + /// Parent state is missing. + MissingState, } /// Auxiliary data associated with an imported block result. #[derive(Debug, Default, PartialEq, Eq)] pub struct ImportedAux { + /// Only the header has been imported. Block body verification was skipped. + pub header_only: bool, /// Clear all pending justification requests. pub clear_justification_requests: bool, /// Request a justification for the given block. @@ -98,6 +102,8 @@ pub struct BlockCheckParams<Block: BlockT> { pub number: NumberFor<Block>, /// Parent hash of the block that we verify. pub parent_hash: Block::Hash, + /// Allow importing the block skipping state verification if parent state is missing. + pub allow_missing_state: bool, } /// Data required to import a Block. @@ -133,6 +139,8 @@ pub struct BlockImportParams<Block: BlockT> { /// Fork choice strategy of this import. This should only be set by a /// synchronous import, otherwise it may race against other imports. pub fork_choice: ForkChoiceStrategy, + /// Allow importing the block skipping state verification if parent state is missing. + pub allow_missing_state: bool, } impl<Block: BlockT> BlockImportParams<Block> { diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index dc1678fcf18..15be6f230a1 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -63,6 +63,8 @@ pub struct IncomingBlock<B: BlockT> { pub justification: Option<Justification>, /// The peer, we received this from pub origin: Option<Origin>, + /// Allow importing the block skipping state verification if parent state is missing. + pub allow_missing_state: bool, } /// Type of keys in the blockchain cache that consensus module could use for its needs. @@ -203,6 +205,10 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>( Ok(BlockImportResult::ImportedKnown(number)) }, Ok(ImportResult::Imported(aux)) => Ok(BlockImportResult::ImportedUnknown(number, aux, peer.clone())), + Ok(ImportResult::MissingState) => { + debug!(target: "sync", "Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash); + Err(BlockImportError::UnknownParent) + }, Ok(ImportResult::UnknownParent) => { debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash); Err(BlockImportError::UnknownParent) @@ -217,12 +223,17 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>( } } }; - match import_error(import_handle.check_block(BlockCheckParams { hash, number, parent_hash }))? { + match import_error(import_handle.check_block(BlockCheckParams { + hash, + number, + parent_hash, + allow_missing_state: block.allow_missing_state, + }))? { BlockImportResult::ImportedUnknown { .. } => (), r => return Ok(r), // Any other successful result means that the block is already imported. } - let (import_block, maybe_keys) = verifier.verify(block_origin, header, justification, block.body) + let (mut import_block, maybe_keys) = verifier.verify(block_origin, header, justification, block.body) .map_err(|msg| { if let Some(ref peer) = peer { trace!(target: "sync", "Verifying {}({}) from {} failed: {}", number, hash, peer, msg); @@ -236,6 +247,7 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>( if let Some(keys) = maybe_keys { cache.extend(keys.into_iter()); } + import_block.allow_missing_state = block.allow_missing_state; import_error(import_handle.import_block(import_block, cache)) } diff --git a/substrate/core/consensus/pow/src/lib.rs b/substrate/core/consensus/pow/src/lib.rs index 040eb01d9c5..5c7716ff8bb 100644 --- a/substrate/core/consensus/pow/src/lib.rs +++ b/substrate/core/consensus/pow/src/lib.rs @@ -304,6 +304,7 @@ impl<B: BlockT<Hash=H256>, C, S, Algorithm> Verifier<B> for PowVerifier<B, C, S, justification, auxiliary: vec![(key, Some(aux.encode()))], fork_choice: ForkChoiceStrategy::Custom(aux.total_difficulty > best_aux.total_difficulty), + allow_missing_state: false, }; Ok((import_block, None)) @@ -531,6 +532,7 @@ fn mine_loop<B: BlockT<Hash=H256>, C, Algorithm, E, SO, S>( finalized: false, auxiliary: vec![(key, Some(aux.encode()))], fork_choice: ForkChoiceStrategy::Custom(true), + allow_missing_state: false, }; block_import.import_block(import_block, HashMap::default()) diff --git a/substrate/core/finality-grandpa/src/light_import.rs b/substrate/core/finality-grandpa/src/light_import.rs index 30008b51ece..d769b2fad9a 100644 --- a/substrate/core/finality-grandpa/src/light_import.rs +++ b/substrate/core/finality-grandpa/src/light_import.rs @@ -663,6 +663,7 @@ pub mod tests { finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: true, }; do_import_block::<_, _, _, TestJustification>( &client, @@ -680,6 +681,7 @@ pub mod tests { bad_justification: false, needs_finality_proof: false, is_new_best: true, + header_only: false, })); } @@ -692,6 +694,7 @@ pub mod tests { bad_justification: false, needs_finality_proof: false, is_new_best: true, + header_only: false, })); } @@ -705,6 +708,7 @@ pub mod tests { bad_justification: false, needs_finality_proof: true, is_new_best: true, + header_only: false, })); } @@ -721,6 +725,7 @@ pub mod tests { bad_justification: false, needs_finality_proof: true, is_new_best: false, + header_only: false, }, )); } diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 34985949519..efc2d3700bf 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -974,6 +974,7 @@ fn allows_reimporting_change_blocks() { finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, } }; @@ -985,6 +986,7 @@ fn allows_reimporting_change_blocks() { bad_justification: false, needs_finality_proof: false, is_new_best: true, + header_only: false, }), ); @@ -1025,6 +1027,7 @@ fn test_bad_justification() { finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, } }; @@ -1721,6 +1724,7 @@ fn imports_justification_for_regular_blocks_on_import() { finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, }; assert_eq!( diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 34bc68f9336..70fe0d942b8 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -480,13 +480,7 @@ impl<B: BlockT> ChainSync<B> { return; } - let block_status = self.client.block_status(&BlockId::Number(number - One::one())) - .unwrap_or(BlockStatus::Unknown); - if block_status == BlockStatus::InChainPruned { - trace!(target: "sync", "Refusing to sync ancient block {:?}", hash); - return; - } - + trace!(target: "sync", "Downloading requested old fork {:?}", hash); self.is_idle = false; for peer_id in &peers { if let Some(peer) = self.peers.get_mut(peer_id) { @@ -571,7 +565,7 @@ impl<B: BlockT> ChainSync<B> { let major_sync = self.status().state == SyncState::Downloading; let blocks = &mut self.blocks; let attrs = &self.required_block_attributes; - let fork_targets = &self.fork_targets; + let fork_targets = &mut self.fork_targets; let mut have_requests = false; let last_finalized = self.client.info().chain.finalized_number; let best_queued = self.best_queued_number; @@ -646,6 +640,7 @@ impl<B: BlockT> ChainSync<B> { body: block_data.block.body, justification: block_data.block.justification, origin: block_data.origin, + allow_missing_state: false, } }).collect() } @@ -658,14 +653,15 @@ impl<B: BlockT> ChainSync<B> { body: b.body, justification: b.justification, origin: Some(who.clone()), + allow_missing_state: true, } }).collect() } PeerSyncState::AncestorSearch(num, state) => { - let block_hash_match = match (blocks.get(0), self.client.block_hash(*num)) { + let matching_hash = match (blocks.get(0), self.client.block_hash(*num)) { (Some(block), Ok(maybe_our_block_hash)) => { trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who); - maybe_our_block_hash.map_or(false, |x| x == block.hash) + maybe_our_block_hash.filter(|x| x == &block.hash) }, (None, _) => { debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); @@ -676,27 +672,34 @@ impl<B: BlockT> ChainSync<B> { return Err(BadPeer(who, ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE)) } }; - if block_hash_match && peer.common_number < *num { + if matching_hash.is_some() && peer.common_number < *num { peer.common_number = *num; } - if !block_hash_match && num.is_zero() { + if matching_hash.is_none() && num.is_zero() { trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); return Err(BadPeer(who, GENESIS_MISMATCH_REPUTATION_CHANGE)) } - if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, block_hash_match) { + if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, matching_hash.is_some()) { peer.state = PeerSyncState::AncestorSearch(next_num, next_state); return Ok(OnBlockData::Request(who, ancestry_request::<B>(next_num))) } else { // Ancestry search is complete. Check if peer is on a stale fork unknown to us and // add it to sync targets if necessary. - trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={}", + trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})", self.best_queued_hash, self.best_queued_number, peer.best_hash, peer.best_number, - peer.common_number + matching_hash, + peer.common_number, ); - if peer.common_number < peer.best_number && peer.best_number < self.best_queued_number { + let client = &self.client; + if peer.common_number < peer.best_number + && peer.best_number < self.best_queued_number + && matching_hash.and_then( + |h| client.block_status(&BlockId::Hash(h)).ok() + ).unwrap_or(BlockStatus::Unknown) != BlockStatus::InChainPruned + { trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who); self.fork_targets .entry(peer.best_hash.clone()) @@ -1250,25 +1253,31 @@ fn peer_block_request<B: BlockT>( /// Get pending fork sync targets for a peer. fn fork_sync_request<B: BlockT>( id: &PeerId, - targets: &HashMap<B::Hash, ForkTarget<B>>, + targets: &mut HashMap<B::Hash, ForkTarget<B>>, best_num: NumberFor<B>, finalized: NumberFor<B>, attributes: &message::BlockAttributes, check_block: impl Fn(&B::Hash) -> BlockStatus, ) -> Option<(B::Hash, BlockRequest<B>)> { + targets.retain(|hash, r| if r.number > finalized { + true + } else { + trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number); + false + }); for (hash, r) in targets { if !r.peers.contains(id) { continue } if r.number <= best_num { - trace!(target: "sync", "Downloading requested fork {:?} from {}", hash, id); let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block); let mut count = (r.number - finalized).saturated_into::<u32>(); // up to the last finalized block if parent_status != BlockStatus::Unknown { // request only single block count = 1; } + trace!(target: "sync", "Downloading requested fork {:?} from {}, {} blocks", hash, id, count); return Some((hash.clone(), message::generic::BlockRequest { id: 0, fields: attributes.clone(), diff --git a/substrate/core/network/src/test/block_import.rs b/substrate/core/network/src/test/block_import.rs index f2830548a50..6d077dcc6bb 100644 --- a/substrate/core/network/src/test/block_import.rs +++ b/substrate/core/network/src/test/block_import.rs @@ -37,9 +37,10 @@ fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock<Block>) (client, hash, number, peer_id.clone(), IncomingBlock { hash, header, - body: None, + body: Some(Vec::new()), justification, - origin: Some(peer_id.clone()) + origin: Some(peer_id.clone()), + allow_missing_state: false, }) } @@ -53,7 +54,7 @@ fn import_single_good_block_works() { match import_single_block(&mut test_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) { Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org)) if *num == number && *aux == expected_aux && *org == Some(peer_id) => {} - _ => panic!() + r @ _ => panic!("{:?}", r) } } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 330988169db..9365430cb0f 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -96,6 +96,7 @@ impl<B: BlockT> Verifier<B> for PassThroughVerifier { post_digests: vec![], auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, }, maybe_keys)) } } @@ -374,16 +375,10 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { } } - /// Count the current number of known blocks. Note that: - /// 1. this might be expensive as it creates an in-memory-copy of the chain - /// to count the blocks, thus if you have a different way of testing this - /// (e.g. `info.best_hash`) - use that. - /// 2. This is not always increasing nor accurate, as the - /// orphaned and proven-to-never-finalized blocks may be pruned at any time. - /// Therefore, this number can drop again. - pub fn blocks_count(&self) -> usize { + /// Count the total number of imported blocks. + pub fn blocks_count(&self) -> u64 { self.backend.as_ref().map( - |backend| backend.as_in_memory().blockchain().blocks_count() + |backend| backend.blocks_count() ).unwrap_or(0) } } @@ -526,9 +521,16 @@ pub trait TestNetFactory: Sized { net } - /// Add a full peer. fn add_full_peer(&mut self, config: &ProtocolConfig) { - let test_client_builder = TestClientBuilder::with_default_backend(); + self.add_full_peer_with_states(config, None) + } + + /// Add a full peer. + fn add_full_peer_with_states(&mut self, config: &ProtocolConfig, keep_blocks: Option<u32>) { + let test_client_builder = match keep_blocks { + Some(keep_blocks) => TestClientBuilder::with_pruning_window(keep_blocks), + None => TestClientBuilder::with_default_backend(), + }; let backend = test_client_builder.backend(); let (c, longest_chain) = test_client_builder.build_with_longest_chain(); let client = Arc::new(c); @@ -686,7 +688,7 @@ pub trait TestNetFactory: Sized { if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 { return Async::NotReady } - match (highest, peer.client.info().chain.best_number) { + match (highest, peer.client.info().chain.best_hash) { (None, b) => highest = Some(b), (Some(ref a), ref b) if a == b => {}, (Some(_), _) => return Async::NotReady, diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index b1b2b9d4072..dd9185373f0 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -236,7 +236,14 @@ fn sync_no_common_longer_chain_fails() { let mut net = TestNet::new(3); net.peer(0).push_blocks(20, true); net.peer(1).push_blocks(20, false); - net.block_until_sync(&mut runtime); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if net.peer(0).is_major_syncing() { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); let peer1 = &net.peers()[1]; assert!(!net.peers()[0].blockchain_canon_equals(peer1)); } @@ -592,3 +599,37 @@ fn can_sync_explicit_forks() { Ok(Async::Ready(())) })).unwrap(); } + +#[test] +fn syncs_header_only_forks() { + let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); + let mut net = TestNet::new(0); + let config = ProtocolConfig::default(); + net.add_full_peer_with_states(&config, None); + net.add_full_peer_with_states(&config, Some(3)); + net.peer(0).push_blocks(2, false); + net.peer(1).push_blocks(2, false); + + net.peer(0).push_blocks(2, true); + let small_hash = net.peer(0).client().info().chain.best_hash; + let small_number = net.peer(0).client().info().chain.best_number; + net.peer(1).push_blocks(4, false); + + net.block_until_sync(&mut runtime); + // Peer 1 won't sync the small fork because common block state is missing + assert_eq!(9, net.peer(0).blocks_count()); + assert_eq!(7, net.peer(1).blocks_count()); + + // Request explicit header-only sync request for the ancient fork. + let first_peer_id = net.peer(0).id(); + net.peer(1).set_sync_fork_request(vec![first_peer_id], small_hash, small_number); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() { + return Ok(Async::NotReady) + } + Ok(Async::Ready(())) + })).unwrap(); +} + diff --git a/substrate/core/service/src/chain_ops.rs b/substrate/core/service/src/chain_ops.rs index e6d7df33c2a..15d01893813 100644 --- a/substrate/core/service/src/chain_ops.rs +++ b/substrate/core/service/src/chain_ops.rs @@ -156,6 +156,7 @@ macro_rules! import_blocks { body: block.body, justification: block.justification, origin: None, + allow_missing_state: false, } ]); } diff --git a/substrate/core/test-client/src/client_ext.rs b/substrate/core/test-client/src/client_ext.rs index 7d3d7301c55..b3bc702afb0 100644 --- a/substrate/core/test-client/src/client_ext.rs +++ b/substrate/core/test-client/src/client_ext.rs @@ -77,6 +77,7 @@ impl<B, E, RA, Block> ClientExt<Block> for Client<B, E, Block, RA> finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, }; BlockImport::import_block(&mut (&*self), import, HashMap::new()).map(|_| ()) @@ -95,6 +96,7 @@ impl<B, E, RA, Block> ClientExt<Block> for Client<B, E, Block, RA> finalized: false, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::Custom(true), + allow_missing_state: false, }; BlockImport::import_block(&mut (&*self), import, HashMap::new()).map(|_| ()) @@ -116,6 +118,7 @@ impl<B, E, RA, Block> ClientExt<Block> for Client<B, E, Block, RA> finalized: true, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, }; BlockImport::import_block(&mut (&*self), import, HashMap::new()).map(|_| ()) diff --git a/substrate/core/test-client/src/lib.rs b/substrate/core/test-client/src/lib.rs index dbe4431456a..a075caec314 100644 --- a/substrate/core/test-client/src/lib.rs +++ b/substrate/core/test-client/src/lib.rs @@ -98,6 +98,12 @@ impl<Block, Executor, G: GenesisInit> TestClientBuilder< pub fn backend(&self) -> Arc<Backend<Block>> { self.backend.clone() } + + /// Create new `TestClientBuilder` with default backend and pruning window size + pub fn with_pruning_window(keep_blocks: u32) -> Self { + let backend = Arc::new(Backend::new_test(keep_blocks, 0)); + Self::with_backend(backend) + } } impl<Executor, Backend, G: GenesisInit> TestClientBuilder<Executor, Backend, G> { diff --git a/substrate/node-template/build.rs b/substrate/node-template/build.rs index a9550783c43..222cbb40928 100644 --- a/substrate/node-template/build.rs +++ b/substrate/node-template/build.rs @@ -1,5 +1,3 @@ -use std::{env, path::PathBuf}; - use vergen::{ConstantsFlags, generate_cargo_keys}; const ERROR_MSG: &str = "Failed to generate metadata files"; diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs index 9f0726fc836..c5780d9f355 100644 --- a/substrate/node/cli/src/service.rs +++ b/substrate/node/cli/src/service.rs @@ -523,6 +523,7 @@ mod tests { finalized: true, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, }; block_import.import_block(params, Default::default()) diff --git a/substrate/test-utils/transaction-factory/src/lib.rs b/substrate/test-utils/transaction-factory/src/lib.rs index 067c75c3fc3..d1526cc8bb9 100644 --- a/substrate/test-utils/transaction-factory/src/lib.rs +++ b/substrate/test-utils/transaction-factory/src/lib.rs @@ -194,6 +194,7 @@ fn import_block<Backend, Exec, Block, RtApi>( justification: None, auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, + allow_missing_state: false, }; (&**client).import_block(import, HashMap::new()).expect("Failed to import block"); } -- GitLab