diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index b2cea2cd5c82b6c87d0e2e33b3edd78f96a5893f..ef581a6e43c3d085212e56d35cbf59f08f807a07 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -1236,6 +1236,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { self.sync.request_justification(&hash, number) } + /// Request syncing for the given block from given set of peers. + /// Uses `protocol` to queue a new block download request and tries to dispatch all pending + /// requests. + pub fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) { + self.sync.set_sync_fork_request(peers, hash, number) + } + /// A batch of blocks have been processed, with or without errors. /// Call this when a batch of blocks have been processed by the importqueue, with or without /// errors. diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 6a2a207abcdf8745bd8b1aa2f13578a55a8b9023..39927792beab8dc72dcc89fcd8b91d0f2e2a522b 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -123,7 +123,10 @@ pub struct ChainSync<B: BlockT> { queue_blocks: HashSet<B::Hash>, /// The best block number that we are currently importing. best_importing_number: NumberFor<B>, + /// Finality proof handler. request_builder: Option<BoxFinalityProofRequestBuilder<B>>, + /// Explicit sync requests. + sync_requests: HashMap<B::Hash, SyncRequest<B>>, /// A flag that caches idle state with no pending requests. is_idle: bool, /// A type to check incoming block announcements. @@ -157,6 +160,11 @@ pub struct PeerInfo<B: BlockT> { pub best_number: NumberFor<B> } +struct SyncRequest<B: BlockT> { + number: NumberFor<B>, + peers: HashSet<PeerId>, +} + /// The state of syncing between a Peer and ourselves. /// /// Generally two categories, "busy" or `Available`. If busy, the enum @@ -299,6 +307,7 @@ impl<B: BlockT> ChainSync<B> { queue_blocks: Default::default(), best_importing_number: Zero::zero(), request_builder, + sync_requests: Default::default(), is_idle: false, block_announce_validator, } @@ -449,6 +458,51 @@ impl<B: BlockT> ChainSync<B> { }) } + /// Request syncing for the given block from given set of peers. + // The implementation is similar to on_block_announce with unknown parent hash. + pub fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) { + if peers.is_empty() { + if let Some(_) = self.sync_requests.remove(hash) { + debug!(target: "sync", "Cleared sync request for block {:?} with {:?}", hash, peers); + } + return; + } + debug!(target: "sync", "Explicit sync request for block {:?} with {:?}", hash, peers); + if self.is_known(&hash) { + debug!(target: "sync", "Refusing to sync known hash {:?}", hash); + return; + } + + let block_status = self.client.block_status(&BlockId::Number(number - One::one())) + .unwrap_or(BlockStatus::Unknown); + if block_status == BlockStatus::InChainPruned { + trace!(target: "sync", "Refusing to sync ancient block {:?}", hash); + return; + } + + self.is_idle = false; + for peer_id in &peers { + if let Some(peer) = self.peers.get_mut(peer_id) { + if let PeerSyncState::AncestorSearch(_, _) = peer.state { + continue; + } + + if number > peer.best_number { + peer.best_number = number; + peer.best_hash = hash.clone(); + } + } + } + + self.sync_requests + .entry(hash.clone()) + .or_insert_with(|| SyncRequest { + number, + peers: Default::default(), + }) + .peers.extend(peers); + } + /// Get an iterator over all scheduled justification requests. pub fn justification_requests(&mut self) -> impl Iterator<Item = (PeerId, BlockRequest<B>)> + '_ { let peers = &mut self.peers; @@ -508,13 +562,21 @@ impl<B: BlockT> ChainSync<B> { } let blocks = &mut self.blocks; let attrs = &self.required_block_attributes; + let sync_requests = &self.sync_requests; let mut have_requests = false; + let last_finalized = self.client.info().chain.finalized_number; + let best_queued = self.best_queued_number; let iter = self.peers.iter_mut().filter_map(move |(id, peer)| { if !peer.state.is_available() { trace!(target: "sync", "Peer {} is busy", id); return None } - if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs) { + if let Some((hash, req)) = explicit_sync_request(id, sync_requests, best_queued, last_finalized, attrs) { + trace!(target: "sync", "Downloading explicitly requested block {:?} from {}", hash, id); + peer.state = PeerSyncState::DownloadingStale(hash); + have_requests = true; + Some((id.clone(), req)) + } else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs) { peer.state = PeerSyncState::DownloadingNew(range.start); trace!(target: "sync", "New block request for {}", id); have_requests = true; @@ -860,6 +922,9 @@ impl<B: BlockT> ChainSync<B> { self.best_queued_number = number; self.best_queued_hash = *hash; } + if let Some(_) = self.sync_requests.remove(&hash) { + trace!(target: "sync", "Completed explicit sync request {:?}", hash); + } // Update common blocks for (n, peer) in self.peers.iter_mut() { if let PeerSyncState::AncestorSearch(_, _) = peer.state { @@ -1232,3 +1297,32 @@ fn peer_block_request<B: BlockT>( None } } + +/// Get pending explicit sync request for a peer. +fn explicit_sync_request<B: BlockT>( + id: &PeerId, + requests: &HashMap<B::Hash, SyncRequest<B>>, + best_num: NumberFor<B>, + finalized: NumberFor<B>, + attributes: &message::BlockAttributes, +) -> Option<(B::Hash, BlockRequest<B>)> +{ + for (hash, r) in requests { + if !r.peers.contains(id) { + continue + } + if r.number <= best_num { + trace!(target: "sync", "Downloading requested fork {:?} from {}", hash, id); + return Some((hash.clone(), message::generic::BlockRequest { + id: 0, + fields: attributes.clone(), + from: message::FromBlock::Hash(hash.clone()), + to: None, + direction: message::Direction::Descending, + max: Some((r.number - finalized).saturated_into::<u32>()), // up to the last finalized block + })) + } + } + None +} + diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 3ca7bffdb45c58cbf8506928f359789806c1edf5..2cf949116f0129631ce7e133c6d997c6e5172144 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -496,6 +496,18 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic Ok(()) } + /// Configure an explicit fork sync request. + /// Note that this function should not be used for recent blocks. + /// Sync should be able to download all the recent forks normally. + /// `set_sync_fork_request` should only be used if external code detects that there's + /// a stale fork missing. + /// Passing empty `peers` set effectively removes the sync request. + pub fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) { + let _ = self + .to_worker + .unbounded_send(ServerToWorkerMsg::SyncFork(peers, hash, number)); + } + /// Modify a peerset priority group. pub fn set_priority_group(&self, group_id: String, peers: HashSet<Multiaddr>) -> Result<(), String> { let peers = peers.into_iter().map(|p| { @@ -586,6 +598,7 @@ enum ServerToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> { GetValue(record::Key), PutValue(record::Key, Vec<u8>), AddKnownAddress(PeerId, Multiaddr), + SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>), } /// Main network worker. Must be polled in order for the network to advance. @@ -664,6 +677,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for Ne self.network_service.put_value(key, value), ServerToWorkerMsg::AddKnownAddress(peer_id, addr) => self.network_service.add_known_address(peer_id, addr), + ServerToWorkerMsg::SyncFork(peer_ids, hash, number) => + self.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), } } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index ddc86b6e95e1826d0acd582a106cea59cc56b418..302ef78d7d43ddccc0c34c409891292882f6ac62 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -229,6 +229,11 @@ pub struct Peer<D, S: NetworkSpecialization<Block>> { } impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { + /// Get this peer ID. + pub fn id(&self) -> PeerId { + self.network.service().local_peer_id() + } + /// Returns true if we're major syncing. pub fn is_major_syncing(&self) -> bool { self.network.service().is_major_syncing() @@ -259,6 +264,11 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { self.network.service().announce_block(hash, data); } + /// Request explicit fork sync. + pub fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: <Block as BlockT>::Hash, number: NumberFor<Block>) { + self.network.service().set_sync_fork_request(peers, hash, number); + } + /// Add blocks to the peer -- edit the block before adding pub fn generate_blocks<F>(&mut self, count: usize, origin: BlockOrigin, edit_block: F) -> H256 where F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index d50190f6573fff8bbc8f8b61ba06ac377d9b7c2b..3d8e57cad038b5fe03a75b2bcaf27928cf86b090 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -526,3 +526,59 @@ fn light_peer_imports_header_from_announce() { let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true); import_with_announce(&mut net, &mut runtime, known_stale_hash); } + +#[test] +fn can_sync_explicit_forks() { + let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); + let mut net = TestNet::new(2); + net.peer(0).push_blocks(30, false); + net.peer(1).push_blocks(30, false); + + // small fork + reorg on peer 1. + net.peer(0).push_blocks_at(BlockId::Number(30), 2, true); + let small_hash = net.peer(0).client().info().chain.best_hash; + let small_number = net.peer(0).client().info().chain.best_number; + net.peer(0).push_blocks_at(BlockId::Number(30), 10, false); + assert_eq!(net.peer(0).client().info().chain.best_number, 40); + + // peer 1 only ever had the long fork. + net.peer(1).push_blocks(10, false); + assert_eq!(net.peer(1).client().info().chain.best_number, 40); + + assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); + assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none()); + + // poll until the two nodes connect, otherwise announcing the block will not work + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); + + // synchronization: 0 synced to longer chain and 1 didn't sync to small chain. + + assert_eq!(net.peer(0).client().info().chain.best_number, 40); + + assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); + assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); + + // request explicit sync + let first_peer_id = net.peer(0).id(); + net.peer(1).set_sync_fork_request(vec![first_peer_id], small_hash, small_number); + + // peer 1 downloads the block. + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + + assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); + if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() { + return Ok(Async::NotReady) + } + Ok(Async::Ready(())) + })).unwrap(); +} +