Skip to content
Snippets Groups Projects
Unverified Commit 69c986f4 authored by Dmitry Markin's avatar Dmitry Markin Committed by GitHub
Browse files

Revert "Check for parent of first ready block being on chain (#1812)" (#1950)

This reverts https://github.com/paritytech/polkadot-sdk/pull/1812 until
we know why it causes syncing issues reported in
https://github.com/subspace/subspace/issues/2122.
parent f3bf5c1a
Branches
No related merge requests found
Pipeline #403123 passed with stages
in 46 minutes and 37 seconds
......@@ -212,31 +212,6 @@ impl<B: BlockT> BlockCollection<B> {
ready
}
/// Returns the block header of the first block that is ready for importing.
/// `from` is the maximum block number for the start of the range that we are interested in.
/// The function will return None if the first block ready is higher than `from`.
/// The logic is structured to be consistent with ready_blocks().
pub fn first_ready_block_header(&self, from: NumberFor<B>) -> Option<B::Header> {
let mut prev = from;
for (&start, range_data) in &self.blocks {
if start > prev {
break
}
match range_data {
BlockRangeState::Complete(blocks) => {
let len = (blocks.len() as u32).into();
prev = start + len;
if let Some(BlockData { block, .. }) = blocks.first() {
return block.header.clone()
}
},
_ => continue,
}
}
None
}
pub fn clear_queued(&mut self, hash: &B::Hash) {
if let Some((from, to)) = self.queued_blocks.remove(hash) {
let mut block_num = from;
......
......@@ -1405,27 +1405,8 @@ where
/// Get the set of downloaded blocks that are ready to be queued for import.
fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
let start_block = self.best_queued_number + One::one();
// Verify that the parent of the first available block is in the chain.
// If not, we are downloading from a fork. In this case, wait until
// the start block has a parent on chain.
let parent_on_chain =
self.blocks.first_ready_block_header(start_block).map_or(false, |hdr| {
std::matches!(
self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown),
BlockStatus::InChainWithState |
BlockStatus::InChainPruned |
BlockStatus::Queued
)
});
if !parent_on_chain {
return vec![]
}
self.blocks
.ready_blocks(start_block)
.ready_blocks(self.best_queued_number + One::one())
.into_iter()
.map(|block_data| {
let justifications = block_data
......@@ -3383,380 +3364,4 @@ mod test {
pending_responses.remove(&peers[1]);
assert_eq!(pending_responses.len(), 0);
}
#[test]
fn syncs_fork_with_partial_response_extends_tip() {
sp_tracing::try_init_simple();
// Set up: the two chains share the first 15 blocks before
// diverging. The other(canonical) chain fork is longer.
let max_blocks_per_request = 64;
let common_ancestor = 15;
let non_canonical_chain_length = common_ancestor + 3;
let canonical_chain_length = common_ancestor + max_blocks_per_request + 10;
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let mut client = Arc::new(TestClientBuilder::new().build());
// Blocks on the non-canonical chain.
let non_canonical_blocks = (0..non_canonical_chain_length)
.map(|_| build_block(&mut client, None, false))
.collect::<Vec<_>>();
// Blocks on the canonical chain.
let canonical_blocks = {
let mut client = Arc::new(TestClientBuilder::new().build());
let common_blocks = non_canonical_blocks[..common_ancestor as usize]
.into_iter()
.inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap())
.cloned()
.collect::<Vec<_>>();
common_blocks
.into_iter()
.chain(
(0..(canonical_chain_length - common_ancestor as u32))
.map(|_| build_block(&mut client, None, true)),
)
.collect::<Vec<_>>()
};
let mut sync = ChainSync::new(
SyncMode::Full,
client.clone(),
ProtocolName::from("test-block-announce-protocol"),
1,
max_blocks_per_request,
None,
chain_sync_network_handle,
)
.unwrap();
// Connect the node we will sync from
let peer_id = PeerId::random();
let canonical_tip = canonical_blocks.last().unwrap().clone();
let mut request = sync
.new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number())
.unwrap()
.unwrap();
assert_eq!(FromBlock::Number(client.info().best_number), request.from);
assert_eq!(Some(1), request.max);
// Do the ancestor search
loop {
let block =
&canonical_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1];
let response = create_block_response(vec![block.clone()]);
let on_block_data = sync.on_block_data(&peer_id, Some(request), response).unwrap();
request = if let OnBlockData::Request(_peer, request) = on_block_data {
request
} else {
// We found the ancestor
break
};
log::trace!(target: LOG_TARGET, "Request: {request:?}");
}
// The response for the 64 blocks is returned in two parts:
// part 1: last 61 blocks [19..79], part 2: first 3 blocks [16-18].
// Even though the first part extends the current chain ending at 18,
// it should not result in an import yet.
let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64;
let resp_2_from = common_ancestor as u64 + 3;
// No import expected.
let request = get_block_request(
&mut sync,
FromBlock::Number(resp_1_from),
max_blocks_per_request as u32,
&peer_id,
);
let from = unwrap_from_block_number(request.from.clone());
let mut resp_blocks = canonical_blocks[18..from as usize].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty()
),);
// Gap filled, expect max_blocks_per_request being imported now.
let request = get_block_request(&mut sync, FromBlock::Number(resp_2_from), 3, &peer_id);
let mut resp_blocks = canonical_blocks[common_ancestor as usize..18].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
let to_import: Vec<_> = match &res {
OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => {
assert_eq!(blocks.len(), sync.max_blocks_per_request as usize);
blocks
.iter()
.map(|b| {
let num = *b.header.as_ref().unwrap().number() as usize;
canonical_blocks[num - 1].clone()
})
.collect()
},
_ => {
panic!("Unexpected response: {res:?}");
},
};
let _ = sync.on_blocks_processed(
max_blocks_per_request as usize,
resp_blocks.len(),
resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
*b.header().number(),
Default::default(),
Some(peer_id),
)),
b.hash(),
)
})
.collect(),
);
to_import.into_iter().for_each(|b| {
assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_))));
block_on(client.import(BlockOrigin::Own, b)).unwrap();
});
let expected_number = common_ancestor as u32 + max_blocks_per_request as u32;
assert_eq!(sync.best_queued_number as u32, expected_number);
assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash());
// Sync rest of the chain.
let request =
get_block_request(&mut sync, FromBlock::Hash(canonical_tip.hash()), 10_u32, &peer_id);
let mut resp_blocks = canonical_blocks
[(canonical_chain_length - 10) as usize..canonical_chain_length as usize]
.to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize
),);
let _ = sync.on_blocks_processed(
max_blocks_per_request as usize,
resp_blocks.len(),
resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
*b.header().number(),
Default::default(),
Some(peer_id),
)),
b.hash(),
)
})
.collect(),
);
resp_blocks.into_iter().rev().for_each(|b| {
assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_))));
block_on(client.import(BlockOrigin::Own, b)).unwrap();
});
let expected_number = canonical_chain_length as u32;
assert_eq!(sync.best_queued_number as u32, expected_number);
assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash());
}
#[test]
fn syncs_fork_with_partial_response_does_not_extend_tip() {
sp_tracing::try_init_simple();
// Set up: the two chains share the first 15 blocks before
// diverging. The other(canonical) chain fork is longer.
let max_blocks_per_request = 64;
let common_ancestor = 15;
let non_canonical_chain_length = common_ancestor + 3;
let canonical_chain_length = common_ancestor + max_blocks_per_request + 10;
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let mut client = Arc::new(TestClientBuilder::new().build());
// Blocks on the non-canonical chain.
let non_canonical_blocks = (0..non_canonical_chain_length)
.map(|_| build_block(&mut client, None, false))
.collect::<Vec<_>>();
// Blocks on the canonical chain.
let canonical_blocks = {
let mut client = Arc::new(TestClientBuilder::new().build());
let common_blocks = non_canonical_blocks[..common_ancestor as usize]
.into_iter()
.inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap())
.cloned()
.collect::<Vec<_>>();
common_blocks
.into_iter()
.chain(
(0..(canonical_chain_length - common_ancestor as u32))
.map(|_| build_block(&mut client, None, true)),
)
.collect::<Vec<_>>()
};
let mut sync = ChainSync::new(
SyncMode::Full,
client.clone(),
ProtocolName::from("test-block-announce-protocol"),
1,
max_blocks_per_request,
None,
chain_sync_network_handle,
)
.unwrap();
// Connect the node we will sync from
let peer_id = PeerId::random();
let canonical_tip = canonical_blocks.last().unwrap().clone();
let mut request = sync
.new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number())
.unwrap()
.unwrap();
assert_eq!(FromBlock::Number(client.info().best_number), request.from);
assert_eq!(Some(1), request.max);
// Do the ancestor search
loop {
let block =
&canonical_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1];
let response = create_block_response(vec![block.clone()]);
let on_block_data = sync.on_block_data(&peer_id, Some(request), response).unwrap();
request = if let OnBlockData::Request(_peer, request) = on_block_data {
request
} else {
// We found the ancestor
break
};
log::trace!(target: LOG_TARGET, "Request: {request:?}");
}
// The response for the 64 blocks is returned in two parts:
// part 1: last 62 blocks [18..79], part 2: first 2 blocks [16-17].
// Even though the first part extends the current chain ending at 18,
// it should not result in an import yet.
let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64;
let resp_2_from = common_ancestor as u64 + 2;
// No import expected.
let request = get_block_request(
&mut sync,
FromBlock::Number(resp_1_from),
max_blocks_per_request as u32,
&peer_id,
);
let from = unwrap_from_block_number(request.from.clone());
let mut resp_blocks = canonical_blocks[17..from as usize].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty()
),);
// Gap filled, expect max_blocks_per_request being imported now.
let request = get_block_request(&mut sync, FromBlock::Number(resp_2_from), 2, &peer_id);
let mut resp_blocks = canonical_blocks[common_ancestor as usize..17].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
let to_import: Vec<_> = match &res {
OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => {
assert_eq!(blocks.len(), sync.max_blocks_per_request as usize);
blocks
.iter()
.map(|b| {
let num = *b.header.as_ref().unwrap().number() as usize;
canonical_blocks[num - 1].clone()
})
.collect()
},
_ => {
panic!("Unexpected response: {res:?}");
},
};
let _ = sync.on_blocks_processed(
max_blocks_per_request as usize,
resp_blocks.len(),
resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
*b.header().number(),
Default::default(),
Some(peer_id),
)),
b.hash(),
)
})
.collect(),
);
to_import.into_iter().for_each(|b| {
assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_))));
block_on(client.import(BlockOrigin::Own, b)).unwrap();
});
let expected_number = common_ancestor as u32 + max_blocks_per_request as u32;
assert_eq!(sync.best_queued_number as u32, expected_number);
assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash());
// Sync rest of the chain.
let request =
get_block_request(&mut sync, FromBlock::Hash(canonical_tip.hash()), 10_u32, &peer_id);
let mut resp_blocks = canonical_blocks
[(canonical_chain_length - 10) as usize..canonical_chain_length as usize]
.to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize
),);
let _ = sync.on_blocks_processed(
max_blocks_per_request as usize,
resp_blocks.len(),
resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
*b.header().number(),
Default::default(),
Some(peer_id),
)),
b.hash(),
)
})
.collect(),
);
resp_blocks.into_iter().rev().for_each(|b| {
assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_))));
block_on(client.import(BlockOrigin::Own, b)).unwrap();
});
let expected_number = canonical_chain_length as u32;
assert_eq!(sync.best_queued_number as u32, expected_number);
assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash());
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment