diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 2adc6d4234159c25cf40bd6f4072a9f22e16d5fa..3825cfa33f73bd4a77f51521d048ae4d512e621e 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -191,6 +191,10 @@ pub enum ChainSyncAction<B: BlockT> { SendBlockRequest { peer_id: PeerId, request: BlockRequest<B> }, /// Drop stale block request. CancelBlockRequest { peer_id: PeerId }, + /// Send state request to peer. + SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, + /// Send warp proof request to peer. + SendWarpProofRequest { peer_id: PeerId, request: WarpProofRequest<B> }, /// Peer misbehaved. Disconnect, report it and cancel the block request to it. DropPeer(BadPeer), /// Import blocks. @@ -1420,11 +1424,6 @@ where .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) } - /// Check if the peer is known to the sync state machine. Used for sanity checks. - pub fn is_peer_known(&self, peer_id: &PeerId) -> bool { - self.peers.contains_key(peer_id) - } - /// Get the set of downloaded blocks that are ready to be queued for import. fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> { self.blocks @@ -1537,7 +1536,7 @@ where } /// Get justification requests scheduled by sync to be sent out. - pub fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> { + fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> { let peers = &mut self.peers; let mut matcher = self.extra_justifications.matcher(); std::iter::from_fn(move || { @@ -1564,7 +1563,7 @@ where } /// Get block requests scheduled by sync to be sent out. - pub fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> { + fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> { if self.mode == SyncMode::Warp { return self .warp_target_block_request() @@ -1691,7 +1690,7 @@ where } /// Get a state request scheduled by sync to be sent out (if any). - pub fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { + fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { if self.allowed_requests.is_empty() { return None } @@ -1737,7 +1736,7 @@ where } /// Get a warp proof request scheduled by sync to be sent out (if any). - pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> { + fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> { if let Some(sync) = &self.warp_sync { if self.allowed_requests.is_empty() || sync.is_complete() || @@ -2025,7 +2024,38 @@ where /// Get pending actions to perform. #[must_use] - pub fn take_actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> { + pub fn actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> { + let block_requests = self + .block_requests() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); + self.actions.extend(block_requests); + + let justification_requests = self + .justification_requests() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); + self.actions.extend(justification_requests); + + let state_request = self + .state_request() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request }); + self.actions.extend(state_request); + + let warp_proof_request = self + .warp_sync_request() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendWarpProofRequest { peer_id, request }); + self.actions.extend(warp_proof_request); + + std::mem::take(&mut self.actions).into_iter() + } + + /// A version of `actions()` that doesn't schedule extra requests. For testing only. + #[cfg(test)] + #[must_use] + fn take_actions(&mut self) -> impl Iterator<Item = ChainSyncAction<B>> { std::mem::take(&mut self.actions).into_iter() } } diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 58a9fdc49f2095a6cf7067701056a86ac025f645..2cb8eab22f7acc08efb437ca40922acb0678fe8c 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -30,7 +30,7 @@ use crate::{ schema::v1::{StateRequest, StateResponse}, service::{ self, - chain_sync::{SyncingService, ToServiceCommand}, + syncing_service::{SyncingService, ToServiceCommand}, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, @@ -713,16 +713,13 @@ where self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); - // Process actions requested by `ChainSync` during `select!`. + // Process actions requested by `ChainSync`. self.process_chain_sync_actions(); - - // Send outbound requests on `ChanSync`'s behalf. - self.send_chain_sync_requests(); } } fn process_chain_sync_actions(&mut self) { - self.chain_sync.take_actions().for_each(|action| match action { + self.chain_sync.actions().for_each(|action| match action { ChainSyncAction::SendBlockRequest { peer_id, request } => { // Sending block request implies dropping obsolete pending response as we are not // interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]). @@ -741,7 +738,25 @@ where ChainSyncAction::CancelBlockRequest { peer_id } => { let removed = self.pending_responses.remove(&peer_id); - trace!(target: LOG_TARGET, "Processed {action:?}., response removed: {removed}."); + trace!(target: LOG_TARGET, "Processed {action:?}, response removed: {removed}."); + }, + ChainSyncAction::SendStateRequest { peer_id, request } => { + self.send_state_request(peer_id, request); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.", + ); + }, + ChainSyncAction::SendWarpProofRequest { peer_id, request } => { + self.send_warp_proof_request(peer_id, request.clone()); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::SendWarpProofRequest` to {}, request: {:?}.", + peer_id, + request, + ); }, ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => { self.pending_responses.remove(&peer_id); @@ -1104,26 +1119,8 @@ where Ok(()) } - fn send_chain_sync_requests(&mut self) { - for (peer_id, request) in self.chain_sync.block_requests() { - self.send_block_request(peer_id, request); - } - - if let Some((peer_id, request)) = self.chain_sync.state_request() { - self.send_state_request(peer_id, request); - } - - for (peer_id, request) in self.chain_sync.justification_requests() { - self.send_block_request(peer_id, request); - } - - if let Some((peer_id, request)) = self.chain_sync.warp_sync_request() { - self.send_warp_sync_request(peer_id, request); - } - } - fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest<B>) { - if !self.chain_sync.is_peer_known(&peer_id) { + if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}"); debug_assert!(false); return @@ -1139,7 +1136,7 @@ where } fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) { - if !self.chain_sync.is_peer_known(&peer_id) { + if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}"); debug_assert!(false); return @@ -1168,8 +1165,8 @@ where } } - fn send_warp_sync_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) { - if !self.chain_sync.is_peer_known(&peer_id) { + fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) { + if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}"); debug_assert!(false); return diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index c42b0601e659fb0d4a32f49cbb8fbcfe30a97dbe..1a7e773c95f7ad68f0831b4aeaf489f685996d7f 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -18,7 +18,7 @@ //! Blockchain syncing implementation in Substrate. -pub use service::chain_sync::SyncingService; +pub use service::syncing_service::SyncingService; pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider}; mod block_announce_validator; diff --git a/substrate/client/network/sync/src/service/mod.rs b/substrate/client/network/sync/src/service/mod.rs index 18331d63ed29f76f383c928107b6753d7ca25d16..d045af26e70dea6da2d33ee6dcf10a6e50d44b55 100644 --- a/substrate/client/network/sync/src/service/mod.rs +++ b/substrate/client/network/sync/src/service/mod.rs @@ -16,8 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -//! `ChainSync`-related service code +//! `SyncingEngine`-related service code -pub mod chain_sync; pub mod mock; pub mod network; +pub mod syncing_service; diff --git a/substrate/client/network/sync/src/service/chain_sync.rs b/substrate/client/network/sync/src/service/syncing_service.rs similarity index 98% rename from substrate/client/network/sync/src/service/chain_sync.rs rename to substrate/client/network/sync/src/service/syncing_service.rs index 3d11880c511c215dd9a5b46dce10810ba4a4d69d..92d649d65dc3a5e5fac70c5a1cc16ed4c825d400 100644 --- a/substrate/client/network/sync/src/service/chain_sync.rs +++ b/substrate/client/network/sync/src/service/syncing_service.rs @@ -34,7 +34,7 @@ use std::{ }, }; -/// Commands send to `ChainSync` +/// Commands send to `SyncingEngine` pub enum ToServiceCommand<B: BlockT> { SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>), RequestJustification(B::Hash, NumberFor<B>), @@ -63,7 +63,7 @@ pub enum ToServiceCommand<B: BlockT> { // }, } -/// Handle for communicating with `ChainSync` asynchronously +/// Handle for communicating with `SyncingEngine` asynchronously #[derive(Clone)] pub struct SyncingService<B: BlockT> { tx: TracingUnboundedSender<ToServiceCommand<B>>, @@ -148,7 +148,7 @@ impl<B: BlockT> SyncingService<B> { /// Get sync status /// - /// Returns an error if `ChainSync` has terminated. + /// Returns an error if `SyncingEngine` has terminated. pub async fn status(&self) -> Result<SyncStatus<B>, ()> { let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx)); diff --git a/substrate/client/network/sync/src/warp.rs b/substrate/client/network/sync/src/warp.rs index 2c0adc856c126568ca80181e6807090f35544341..169b3de35aa1bfe5f11098680b9243bc3213d263 100644 --- a/substrate/client/network/sync/src/warp.rs +++ b/substrate/client/network/sync/src/warp.rs @@ -42,7 +42,7 @@ const LOG_TARGET: &'static str = "sync"; pub struct EncodedProof(pub Vec<u8>); /// Warp sync request -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, Clone)] pub struct WarpProofRequest<B: BlockT> { /// Start collecting proofs from this block. pub begin: B::Hash, diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index f869e3a171a3c23979b17767903fb748a9c5f467..cfc3cb7af3fcb9580d08ce6a6762e68485bb8ae7 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -64,7 +64,7 @@ use sc_network_common::role::Roles; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ block_request_handler::BlockRequestHandler, - service::{chain_sync::SyncingService, network::NetworkServiceProvider}, + service::{network::NetworkServiceProvider, syncing_service::SyncingService}, state_request_handler::StateRequestHandler, warp::{ AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider,