From 5896072b8630440a260f7cbe1bcf537fc47a95bf Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi <nazar@mokrynskyi.com> Date: Tue, 12 Jul 2022 23:34:17 +0300 Subject: [PATCH] Network sync refactoring (part 4) (#11412) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Remove direct dependency of `sc-network` on `sc-network-light` * Move `WarpSyncProvider` trait and surrounding data structures into `sc-network-common` * Move `WarpSyncProvider` trait and surrounding data structures into `sc-network-common` * Create `sync` module in `sc-network-common`, create `ChainSync` trait there (not used yet), move a bunch of associated data structures from `sc-network-sync` * Switch from concrete implementation to `ChainSync` trait from `sc-network-common` * Introduce `OpaqueStateRequest`/`OpaqueStateResponse` to remove generics from `StateSync` trait * Introduce `OpaqueBlockRequest`/`OpaqueBlockResponse`, make `scheme` module of `sc-network-sync` private * Surface `sc-network-sync` into `sc-service` and make `sc-network` not depend on it anymore * Remove now unnecessary dependency from `sc-network` * Replace crate links with just text since dependencies are gone now * Remove `warp_sync` re-export from `sc-network-common` * Update copyright in network-related files * Address review comments about documentation * Apply review suggestion * Rename `extra_requests` module to `metrics` Co-authored-by: Bastian Köcher <info@kchr.de> --- substrate/Cargo.lock | 13 +- substrate/client/beefy/src/tests.rs | 14 +- substrate/client/consensus/aura/src/lib.rs | 14 +- substrate/client/consensus/babe/src/tests.rs | 15 +- substrate/client/finality-grandpa/Cargo.toml | 1 + .../client/finality-grandpa/src/tests.rs | 20 +- .../client/finality-grandpa/src/warp_proof.rs | 2 +- substrate/client/network/Cargo.toml | 5 +- substrate/client/network/common/Cargo.toml | 5 + substrate/client/network/common/src/lib.rs | 1 + substrate/client/network/common/src/sync.rs | 394 ++++++ .../{sync/src => common/src/sync}/message.rs | 2 +- .../client/network/common/src/sync/metrics.rs | 25 + .../client/network/common/src/sync/warp.rs | 94 ++ substrate/client/network/src/behaviour.rs | 68 +- substrate/client/network/src/bitswap.rs | 5 +- substrate/client/network/src/config.rs | 33 +- substrate/client/network/src/lib.rs | 8 +- substrate/client/network/src/protocol.rs | 310 ++--- .../client/network/src/protocol/message.rs | 10 +- substrate/client/network/src/service.rs | 27 +- substrate/client/network/src/service/tests.rs | 24 +- substrate/client/network/sync/Cargo.toml | 2 - .../network/sync/src/block_request_handler.rs | 6 +- substrate/client/network/sync/src/blocks.rs | 4 +- .../client/network/sync/src/extra_requests.rs | 11 +- substrate/client/network/sync/src/lib.rs | 1112 ++++++++--------- substrate/client/network/sync/src/schema.rs | 2 +- substrate/client/network/sync/src/state.rs | 10 +- substrate/client/network/sync/src/warp.rs | 45 +- .../network/sync/src/warp_request_handler.rs | 47 +- substrate/client/network/test/Cargo.toml | 2 + substrate/client/network/test/src/lib.rs | 110 +- substrate/client/service/Cargo.toml | 4 +- substrate/client/service/src/builder.rs | 28 +- 35 files changed, 1359 insertions(+), 1114 deletions(-) create mode 100644 substrate/client/network/common/src/sync.rs rename substrate/client/network/{sync/src => common/src/sync}/message.rs (99%) create mode 100644 substrate/client/network/common/src/sync/metrics.rs create mode 100644 substrate/client/network/common/src/sync/warp.rs diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index f98d98a3c57..3b7c315be43 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -8429,6 +8429,7 @@ dependencies = [ "sc-consensus", "sc-keystore", "sc-network", + "sc-network-common", "sc-network-gossip", "sc-network-test", "sc-telemetry", @@ -8553,7 +8554,6 @@ dependencies = [ "sp-blockchain", "sp-consensus", "sp-core", - "sp-finality-grandpa", "sp-runtime", "sp-test-primitives", "sp-tracing", @@ -8571,12 +8571,17 @@ dependencies = [ name = "sc-network-common" version = "0.10.0-dev" dependencies = [ + "bitflags", "futures", "libp2p", "parity-scale-codec", "prost-build", + "sc-consensus", "sc-peerset", "smallvec", + "sp-consensus", + "sp-finality-grandpa", + "sp-runtime", ] [[package]] @@ -8621,8 +8626,6 @@ dependencies = [ name = "sc-network-sync" version = "0.10.0-dev" dependencies = [ - "bitflags", - "either", "fork-tree", "futures", "libp2p", @@ -8667,6 +8670,8 @@ dependencies = [ "sc-consensus", "sc-network", "sc-network-common", + "sc-network-light", + "sc-network-sync", "sc-service", "sp-blockchain", "sp-consensus", @@ -8849,6 +8854,8 @@ dependencies = [ "sc-keystore", "sc-network", "sc-network-common", + "sc-network-light", + "sc-network-sync", "sc-offchain", "sc-rpc", "sc-rpc-server", diff --git a/substrate/client/beefy/src/tests.rs b/substrate/client/beefy/src/tests.rs index b5ff27c8089..8090c425e71 100644 --- a/substrate/client/beefy/src/tests.rs +++ b/substrate/client/beefy/src/tests.rs @@ -28,7 +28,6 @@ use sc_chain_spec::{ChainSpec, GenericChainSpec}; use sc_client_api::HeaderBackend; use sc_consensus::BoxJustificationImport; use sc_keystore::LocalKeystore; -use sc_network::config::ProtocolConfig; use sc_network_test::{ Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, Peer, PeersClient, TestNetFactory, @@ -111,6 +110,7 @@ pub(crate) struct PeerData { pub(crate) beefy_link_half: Mutex<Option<BeefyLinkHalf>>, } +#[derive(Default)] pub(crate) struct BeefyTestNet { peers: Vec<BeefyPeer>, } @@ -166,17 +166,7 @@ impl TestNetFactory for BeefyTestNet { type BlockImport = PeersClient; type PeerData = PeerData; - /// Create new test network with peers and given config. - fn from_config(_config: &ProtocolConfig) -> Self { - BeefyTestNet { peers: Vec::new() } - } - - fn make_verifier( - &self, - _client: PeersClient, - _cfg: &ProtocolConfig, - _: &PeerData, - ) -> Self::Verifier { + fn make_verifier(&self, _client: PeersClient, _: &PeerData) -> Self::Verifier { PassThroughVerifier::new(false) // use non-instant finality. } diff --git a/substrate/client/consensus/aura/src/lib.rs b/substrate/client/consensus/aura/src/lib.rs index ac3b89f2ff9..ee8be727dcd 100644 --- a/substrate/client/consensus/aura/src/lib.rs +++ b/substrate/client/consensus/aura/src/lib.rs @@ -566,7 +566,6 @@ mod tests { use sc_consensus::BoxJustificationImport; use sc_consensus_slots::{BackoffAuthoringOnFinalizedHeadLagging, SimpleSlotWorker}; use sc_keystore::LocalKeystore; - use sc_network::config::ProtocolConfig; use sc_network_test::{Block as TestBlock, *}; use sp_application_crypto::key_types::AURA; use sp_consensus::{ @@ -645,6 +644,7 @@ mod tests { >; type AuraPeer = Peer<(), PeersClient>; + #[derive(Default)] pub struct AuraTestNet { peers: Vec<AuraPeer>, } @@ -654,17 +654,7 @@ mod tests { type PeerData = (); type BlockImport = PeersClient; - /// Create new test network with peers and given config. - fn from_config(_config: &ProtocolConfig) -> Self { - AuraTestNet { peers: Vec::new() } - } - - fn make_verifier( - &self, - client: PeersClient, - _cfg: &ProtocolConfig, - _peer_data: &(), - ) -> Self::Verifier { + fn make_verifier(&self, client: PeersClient, _peer_data: &()) -> Self::Verifier { let client = client.as_client(); let slot_duration = slot_duration(&*client).expect("slot duration available"); diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs index e0590fc0cd8..c0a7a8c6c01 100644 --- a/substrate/client/consensus/babe/src/tests.rs +++ b/substrate/client/consensus/babe/src/tests.rs @@ -29,7 +29,6 @@ use sc_client_api::{backend::TransactionFor, BlockchainEvents, Finalizer}; use sc_consensus::{BoxBlockImport, BoxJustificationImport}; use sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging; use sc_keystore::LocalKeystore; -use sc_network::config::ProtocolConfig; use sc_network_test::{Block as TestBlock, *}; use sp_application_crypto::key_types::BABE; use sp_consensus::{AlwaysCanAuthor, DisableProofRecording, NoNetwork as DummyOracle, Proposal}; @@ -220,6 +219,7 @@ where type BabePeer = Peer<Option<PeerData>, BabeBlockImport>; +#[derive(Default)] pub struct BabeTestNet { peers: Vec<BabePeer>, } @@ -278,12 +278,6 @@ impl TestNetFactory for BabeTestNet { type PeerData = Option<PeerData>; type BlockImport = BabeBlockImport; - /// Create new test network with peers and given config. - fn from_config(_config: &ProtocolConfig) -> Self { - debug!(target: "babe", "Creating test network from config"); - BabeTestNet { peers: Vec::new() } - } - fn make_block_import( &self, client: PeersClient, @@ -309,12 +303,7 @@ impl TestNetFactory for BabeTestNet { ) } - fn make_verifier( - &self, - client: PeersClient, - _cfg: &ProtocolConfig, - maybe_link: &Option<PeerData>, - ) -> Self::Verifier { + fn make_verifier(&self, client: PeersClient, maybe_link: &Option<PeerData>) -> Self::Verifier { use substrate_test_runtime_client::DefaultTestClientBuilderExt; let client = client.as_client(); diff --git a/substrate/client/finality-grandpa/Cargo.toml b/substrate/client/finality-grandpa/Cargo.toml index 77cd847d481..a5f20b9f326 100644 --- a/substrate/client/finality-grandpa/Cargo.toml +++ b/substrate/client/finality-grandpa/Cargo.toml @@ -36,6 +36,7 @@ sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } sc-keystore = { version = "4.0.0-dev", path = "../keystore" } sc-network = { version = "0.10.0-dev", path = "../network" } sc-network-gossip = { version = "0.10.0-dev", path = "../network-gossip" } +sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-telemetry = { version = "4.0.0-dev", path = "../telemetry" } sc-utils = { version = "4.0.0-dev", path = "../utils" } sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } diff --git a/substrate/client/finality-grandpa/src/tests.rs b/substrate/client/finality-grandpa/src/tests.rs index 2d12232b04f..623ac577c55 100644 --- a/substrate/client/finality-grandpa/src/tests.rs +++ b/substrate/client/finality-grandpa/src/tests.rs @@ -28,7 +28,7 @@ use sc_consensus::{ BlockImport, BlockImportParams, BoxJustificationImport, ForkChoiceStrategy, ImportResult, ImportedAux, }; -use sc_network::config::{ProtocolConfig, Role}; +use sc_network::config::Role; use sc_network_test::{ Block, BlockImportAdapter, FullPeerConfig, Hash, PassThroughVerifier, Peer, PeersClient, PeersFullClient, TestClient, TestNetFactory, @@ -73,6 +73,7 @@ type GrandpaBlockImport = crate::GrandpaBlockImport< LongestChain<substrate_test_runtime_client::Backend, Block>, >; +#[derive(Default)] struct GrandpaTestNet { peers: Vec<GrandpaPeer>, test_config: TestApi, @@ -110,16 +111,6 @@ impl TestNetFactory for GrandpaTestNet { type PeerData = PeerData; type BlockImport = GrandpaBlockImport; - /// Create new test network with peers and given config. - fn from_config(_config: &ProtocolConfig) -> Self { - GrandpaTestNet { peers: Vec::new(), test_config: Default::default() } - } - - fn default_config() -> ProtocolConfig { - // This is unused. - ProtocolConfig::default() - } - fn add_full_peer(&mut self) { self.add_full_peer_with_config(FullPeerConfig { notifications_protocols: vec![grandpa_protocol_name::NAME.into()], @@ -128,12 +119,7 @@ impl TestNetFactory for GrandpaTestNet { }) } - fn make_verifier( - &self, - _client: PeersClient, - _cfg: &ProtocolConfig, - _: &PeerData, - ) -> Self::Verifier { + fn make_verifier(&self, _client: PeersClient, _: &PeerData) -> Self::Verifier { PassThroughVerifier::new(false) // use non-instant finality. } diff --git a/substrate/client/finality-grandpa/src/warp_proof.rs b/substrate/client/finality-grandpa/src/warp_proof.rs index 90f6828a110..a31a0a8b919 100644 --- a/substrate/client/finality-grandpa/src/warp_proof.rs +++ b/substrate/client/finality-grandpa/src/warp_proof.rs @@ -23,7 +23,7 @@ use crate::{ BlockNumberOps, GrandpaJustification, SharedAuthoritySet, }; use sc_client_api::Backend as ClientBackend; -use sc_network::warp_request_handler::{EncodedProof, VerificationResult, WarpSyncProvider}; +use sc_network_common::sync::warp::{EncodedProof, VerificationResult, WarpSyncProvider}; use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend}; use sp_finality_grandpa::{AuthorityList, SetId, GRANDPA_ENGINE_ID}; use sp_runtime::{ diff --git a/substrate/client/network/Cargo.toml b/substrate/client/network/Cargo.toml index dfd2db2e6b8..2742262b57e 100644 --- a/substrate/client/network/Cargo.toml +++ b/substrate/client/network/Cargo.toml @@ -51,15 +51,12 @@ sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } sc-client-api = { version = "4.0.0-dev", path = "../api" } sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } sc-network-common = { version = "0.10.0-dev", path = "./common" } -sc-network-light = { version = "0.10.0-dev", path = "./light" } -sc-network-sync = { version = "0.10.0-dev", path = "./sync" } sc-peerset = { version = "4.0.0-dev", path = "../peerset" } sc-utils = { version = "4.0.0-dev", path = "../utils" } sp-arithmetic = { version = "5.0.0", path = "../../primitives/arithmetic" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } sp-core = { version = "6.0.0", path = "../../primitives/core" } -sp-finality-grandpa = { version = "4.0.0-dev", path = "../../primitives/finality-grandpa" } sp-runtime = { version = "6.0.0", path = "../../primitives/runtime" } [dev-dependencies] @@ -67,6 +64,8 @@ assert_matches = "1.3" async-std = "1.11.0" rand = "0.7.2" tempfile = "3.1.0" +sc-network-light = { version = "0.10.0-dev", path = "./light" } +sc-network-sync = { version = "0.10.0-dev", path = "./sync" } sp-test-primitives = { version = "2.0.0", path = "../../primitives/test-primitives" } sp-tracing = { version = "5.0.0", path = "../../primitives/tracing" } substrate-test-runtime = { version = "2.0.0", path = "../../test-utils/runtime" } diff --git a/substrate/client/network/common/Cargo.toml b/substrate/client/network/common/Cargo.toml index e69787d7aff..b0e3a8fe42a 100644 --- a/substrate/client/network/common/Cargo.toml +++ b/substrate/client/network/common/Cargo.toml @@ -17,10 +17,15 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.10" [dependencies] +bitflags = "1.3.2" codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive", ] } futures = "0.3.21" libp2p = "0.46.1" smallvec = "1.8.0" +sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-peerset = { version = "4.0.0-dev", path = "../../peerset" } +sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } +sp-finality-grandpa = { version = "4.0.0-dev", path = "../../../primitives/finality-grandpa" } +sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" } diff --git a/substrate/client/network/common/src/lib.rs b/substrate/client/network/common/src/lib.rs index 81769e23deb..9fbedc542c1 100644 --- a/substrate/client/network/common/src/lib.rs +++ b/substrate/client/network/common/src/lib.rs @@ -21,3 +21,4 @@ pub mod config; pub mod message; pub mod request_responses; +pub mod sync; diff --git a/substrate/client/network/common/src/sync.rs b/substrate/client/network/common/src/sync.rs new file mode 100644 index 00000000000..2ee8f8c5181 --- /dev/null +++ b/substrate/client/network/common/src/sync.rs @@ -0,0 +1,394 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Abstract interfaces and data structures related to network sync. + +pub mod message; +pub mod metrics; +pub mod warp; + +use libp2p::PeerId; +use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}; +use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; +use sp_consensus::BlockOrigin; +use sp_runtime::{ + traits::{Block as BlockT, NumberFor}, + Justifications, +}; +use std::{any::Any, fmt, fmt::Formatter, task::Poll}; +use warp::{EncodedProof, WarpProofRequest, WarpSyncProgress}; + +/// The sync status of a peer we are trying to sync with +#[derive(Debug)] +pub struct PeerInfo<Block: BlockT> { + /// Their best block hash. + pub best_hash: Block::Hash, + /// Their best block number. + pub best_number: NumberFor<Block>, +} + +/// Reported sync state. +#[derive(Clone, Eq, PartialEq, Debug)] +pub enum SyncState { + /// Initial sync is complete, keep-up sync is active. + Idle, + /// Actively catching up with the chain. + Downloading, +} + +/// Reported state download progress. +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct StateDownloadProgress { + /// Estimated download percentage. + pub percentage: u32, + /// Total state size in bytes downloaded so far. + pub size: u64, +} + +/// Syncing status and statistics. +#[derive(Clone)] +pub struct SyncStatus<Block: BlockT> { + /// Current global sync state. + pub state: SyncState, + /// Target sync block number. + pub best_seen_block: Option<NumberFor<Block>>, + /// Number of peers participating in syncing. + pub num_peers: u32, + /// Number of blocks queued for import + pub queued_blocks: u32, + /// State sync status in progress, if any. + pub state_sync: Option<StateDownloadProgress>, + /// Warp sync in progress, if any. + pub warp_sync: Option<WarpSyncProgress<Block>>, +} + +/// A peer did not behave as expected and should be reported. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BadPeer(pub PeerId, pub sc_peerset::ReputationChange); + +impl fmt::Display for BadPeer { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Bad peer {}; Reputation change: {:?}", self.0, self.1) + } +} + +impl std::error::Error for BadPeer {} + +/// Result of [`ChainSync::on_block_data`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OnBlockData<Block: BlockT> { + /// The block should be imported. + Import(BlockOrigin, Vec<IncomingBlock<Block>>), + /// A new block request needs to be made to the given peer. + Request(PeerId, BlockRequest<Block>), +} + +/// Result of [`ChainSync::on_block_justification`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OnBlockJustification<Block: BlockT> { + /// The justification needs no further handling. + Nothing, + /// The justification should be imported. + Import { + peer: PeerId, + hash: Block::Hash, + number: NumberFor<Block>, + justifications: Justifications, + }, +} + +/// Result of [`ChainSync::on_state_data`]. +#[derive(Debug)] +pub enum OnStateData<Block: BlockT> { + /// The block and state that should be imported. + Import(BlockOrigin, IncomingBlock<Block>), + /// A new state request needs to be made to the given peer. + Continue, +} + +/// Result of [`ChainSync::poll_block_announce_validation`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PollBlockAnnounceValidation<H> { + /// The announcement failed at validation. + /// + /// The peer reputation should be decreased. + Failure { + /// Who sent the processed block announcement? + who: PeerId, + /// Should the peer be disconnected? + disconnect: bool, + }, + /// The announcement does not require further handling. + Nothing { + /// Who sent the processed block announcement? + who: PeerId, + /// Was this their new best block? + is_best: bool, + /// The announcement. + announce: BlockAnnounce<H>, + }, + /// The announcement header should be imported. + ImportHeader { + /// Who sent the processed block announcement? + who: PeerId, + /// Was this their new best block? + is_best: bool, + /// The announcement. + announce: BlockAnnounce<H>, + }, + /// The block announcement should be skipped. + Skip, +} + +/// Operation mode. +#[derive(Debug, PartialEq, Eq)] +pub enum SyncMode { + // Sync headers only + Light, + // Sync headers and block bodies + Full, + // Sync headers and the last finalied state + LightState { storage_chain_mode: bool, skip_proofs: bool }, + // Warp sync mode. + Warp, +} + +#[derive(Debug)] +pub struct Metrics { + pub queued_blocks: u32, + pub fork_targets: u32, + pub justifications: metrics::Metrics, +} + +/// Wrapper for implementation-specific state request. +/// +/// NOTE: Implementation must be able to encode and decode it for network purposes. +pub struct OpaqueStateRequest(pub Box<dyn Any + Send>); + +impl fmt::Debug for OpaqueStateRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("OpaqueStateRequest").finish() + } +} + +/// Wrapper for implementation-specific state response. +/// +/// NOTE: Implementation must be able to encode and decode it for network purposes. +pub struct OpaqueStateResponse(pub Box<dyn Any + Send>); + +impl fmt::Debug for OpaqueStateResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("OpaqueStateResponse").finish() + } +} + +/// Wrapper for implementation-specific block request. +/// +/// NOTE: Implementation must be able to encode and decode it for network purposes. +pub struct OpaqueBlockRequest(pub Box<dyn Any + Send>); + +impl fmt::Debug for OpaqueBlockRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("OpaqueBlockRequest").finish() + } +} + +/// Wrapper for implementation-specific block response. +/// +/// NOTE: Implementation must be able to encode and decode it for network purposes. +pub struct OpaqueBlockResponse(pub Box<dyn Any + Send>); + +impl fmt::Debug for OpaqueBlockResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("OpaqueBlockResponse").finish() + } +} + +/// Something that represents the syncing strategy to download past and future blocks of the chain. +pub trait ChainSync<Block: BlockT>: Send { + /// Returns the state of the sync of the given peer. + /// + /// Returns `None` if the peer is unknown. + fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<Block>>; + + /// Returns the current sync status. + fn status(&self) -> SyncStatus<Block>; + + /// Number of active forks requests. This includes + /// requests that are pending or could be issued right away. + fn num_sync_requests(&self) -> usize; + + /// Number of downloaded blocks. + fn num_downloaded_blocks(&self) -> usize; + + /// Returns the current number of peers stored within this state machine. + fn num_peers(&self) -> usize; + + /// Handle a new connected peer. + /// + /// Call this method whenever we connect to a new peer. + fn new_peer( + &mut self, + who: PeerId, + best_hash: Block::Hash, + best_number: NumberFor<Block>, + ) -> Result<Option<BlockRequest<Block>>, BadPeer>; + + /// Signal that a new best block has been imported. + fn update_chain_info(&mut self, best_hash: &Block::Hash, best_number: NumberFor<Block>); + + /// Schedule a justification request for the given block. + fn request_justification(&mut self, hash: &Block::Hash, number: NumberFor<Block>); + + /// Clear all pending justification requests. + fn clear_justification_requests(&mut self); + + /// Request syncing for the given block from given set of peers. + fn set_sync_fork_request( + &mut self, + peers: Vec<PeerId>, + hash: &Block::Hash, + number: NumberFor<Block>, + ); + + /// Get an iterator over all scheduled justification requests. + fn justification_requests( + &mut self, + ) -> Box<dyn Iterator<Item = (PeerId, BlockRequest<Block>)> + '_>; + + /// Get an iterator over all block requests of all peers. + fn block_requests(&mut self) -> Box<dyn Iterator<Item = (&PeerId, BlockRequest<Block>)> + '_>; + + /// Get a state request, if any. + fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)>; + + /// Get a warp sync request, if any. + fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<Block>)>; + + /// Handle a response from the remote to a block request that we made. + /// + /// `request` must be the original request that triggered `response`. + /// or `None` if data comes from the block announcement. + /// + /// If this corresponds to a valid block, this outputs the block that + /// must be imported in the import queue. + fn on_block_data( + &mut self, + who: &PeerId, + request: Option<BlockRequest<Block>>, + response: BlockResponse<Block>, + ) -> Result<OnBlockData<Block>, BadPeer>; + + /// Handle a response from the remote to a state request that we made. + fn on_state_data( + &mut self, + who: &PeerId, + response: OpaqueStateResponse, + ) -> Result<OnStateData<Block>, BadPeer>; + + /// Handle a response from the remote to a warp proof request that we made. + fn on_warp_sync_data(&mut self, who: &PeerId, response: EncodedProof) -> Result<(), BadPeer>; + + /// Handle a response from the remote to a justification request that we made. + /// + /// `request` must be the original request that triggered `response`. + fn on_block_justification( + &mut self, + who: PeerId, + response: BlockResponse<Block>, + ) -> Result<OnBlockJustification<Block>, BadPeer>; + + /// A batch of blocks have been processed, with or without errors. + /// + /// Call this when a batch of blocks have been processed by the import + /// queue, with or without errors. + fn on_blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>, Block::Hash)>, + ) -> Box<dyn Iterator<Item = Result<(PeerId, BlockRequest<Block>), BadPeer>>>; + + /// Call this when a justification has been processed by the import queue, + /// with or without errors. + fn on_justification_import( + &mut self, + hash: Block::Hash, + number: NumberFor<Block>, + success: bool, + ); + + /// Notify about finalization of the given block. + fn on_block_finalized(&mut self, hash: &Block::Hash, number: NumberFor<Block>); + + /// Push a block announce validation. + /// + /// It is required that [`ChainSync::poll_block_announce_validation`] is called + /// to check for finished block announce validations. + fn push_block_announce_validation( + &mut self, + who: PeerId, + hash: Block::Hash, + announce: BlockAnnounce<Block::Header>, + is_best: bool, + ); + + /// Poll block announce validation. + /// + /// Block announce validations can be pushed by using + /// [`ChainSync::push_block_announce_validation`]. + /// + /// This should be polled until it returns [`Poll::Pending`]. + /// + /// If [`PollBlockAnnounceValidation::ImportHeader`] is returned, then the caller MUST try to + /// import passed header (call `on_block_data`). The network request isn't sent in this case. + fn poll_block_announce_validation( + &mut self, + cx: &mut std::task::Context, + ) -> Poll<PollBlockAnnounceValidation<Block::Header>>; + + /// Call when a peer has disconnected. + /// Canceled obsolete block request may result in some blocks being ready for + /// import, so this functions checks for such blocks and returns them. + fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<Block>>; + + /// Return some key metrics. + fn metrics(&self) -> Metrics; + + /// Create implementation-specific block request. + fn create_opaque_block_request(&self, request: &BlockRequest<Block>) -> OpaqueBlockRequest; + + /// Encode implementation-specific block request. + fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result<Vec<u8>, String>; + + /// Decode implementation-specific block response. + fn decode_block_response(&self, response: &[u8]) -> Result<OpaqueBlockResponse, String>; + + /// Access blocks from implementation-specific block response. + fn block_response_into_blocks( + &self, + request: &BlockRequest<Block>, + response: OpaqueBlockResponse, + ) -> Result<Vec<BlockData<Block>>, String>; + + /// Encode implementation-specific state request. + fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String>; + + /// Decode implementation-specific state response. + fn decode_state_response(&self, response: &[u8]) -> Result<OpaqueStateResponse, String>; +} diff --git a/substrate/client/network/sync/src/message.rs b/substrate/client/network/common/src/sync/message.rs similarity index 99% rename from substrate/client/network/sync/src/message.rs rename to substrate/client/network/common/src/sync/message.rs index 996ee5231cf..27ab2704e64 100644 --- a/substrate/client/network/sync/src/message.rs +++ b/substrate/client/network/common/src/sync/message.rs @@ -124,8 +124,8 @@ impl<H: HeaderT> BlockAnnounce<H> { /// Generic types. pub mod generic { use super::{BlockAttributes, BlockState, Direction}; + use crate::message::RequestId; use codec::{Decode, Encode, Input, Output}; - use sc_network_common::message::RequestId; use sp_runtime::{EncodedJustification, Justifications}; /// Block data sent in the response. diff --git a/substrate/client/network/common/src/sync/metrics.rs b/substrate/client/network/common/src/sync/metrics.rs new file mode 100644 index 00000000000..15ff090a8cc --- /dev/null +++ b/substrate/client/network/common/src/sync/metrics.rs @@ -0,0 +1,25 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +#[derive(Debug)] +pub struct Metrics { + pub pending_requests: u32, + pub active_requests: u32, + pub importing_requests: u32, + pub failed_requests: u32, +} diff --git a/substrate/client/network/common/src/sync/warp.rs b/substrate/client/network/common/src/sync/warp.rs new file mode 100644 index 00000000000..339a4c33a7e --- /dev/null +++ b/substrate/client/network/common/src/sync/warp.rs @@ -0,0 +1,94 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +use codec::{Decode, Encode}; +pub use sp_finality_grandpa::{AuthorityList, SetId}; +use sp_runtime::traits::{Block as BlockT, NumberFor}; +use std::fmt; + +/// Scale-encoded warp sync proof response. +pub struct EncodedProof(pub Vec<u8>); + +/// Warp sync request +#[derive(Encode, Decode, Debug)] +pub struct WarpProofRequest<B: BlockT> { + /// Start collecting proofs from this block. + pub begin: B::Hash, +} + +/// Proof verification result. +pub enum VerificationResult<Block: BlockT> { + /// Proof is valid, but the target was not reached. + Partial(SetId, AuthorityList, Block::Hash), + /// Target finality is proved. + Complete(SetId, AuthorityList, Block::Header), +} + +/// Warp sync backend. Handles retrieveing and verifying warp sync proofs. +pub trait WarpSyncProvider<Block: BlockT>: Send + Sync { + /// Generate proof starting at given block hash. The proof is accumulated until maximum proof + /// size is reached. + fn generate( + &self, + start: Block::Hash, + ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>; + /// Verify warp proof against current set of authorities. + fn verify( + &self, + proof: &EncodedProof, + set_id: SetId, + authorities: AuthorityList, + ) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>; + /// Get current list of authorities. This is supposed to be genesis authorities when starting + /// sync. + fn current_authorities(&self) -> AuthorityList; +} + +/// Reported warp sync phase. +#[derive(Clone, Eq, PartialEq, Debug)] +pub enum WarpSyncPhase<Block: BlockT> { + /// Waiting for peers to connect. + AwaitingPeers, + /// Downloading and verifying grandpa warp proofs. + DownloadingWarpProofs, + /// Downloading state data. + DownloadingState, + /// Importing state. + ImportingState, + /// Downloading block history. + DownloadingBlocks(NumberFor<Block>), +} + +impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::AwaitingPeers => write!(f, "Waiting for peers"), + Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"), + Self::DownloadingState => write!(f, "Downloading state"), + Self::ImportingState => write!(f, "Importing state"), + Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n), + } + } +} + +/// Reported warp sync progress. +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct WarpSyncProgress<Block: BlockT> { + /// Estimated download percentage. + pub phase: WarpSyncPhase<Block>, + /// Total bytes downloaded so far. + pub total_bytes: u64, +} diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 091dd116e4c..515608df13d 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -38,7 +38,7 @@ use libp2p::{ NetworkBehaviour, }; use log::debug; -use prost::Message; + use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::import_queue::{IncomingBlock, Origin}; use sc_network_common::{config::ProtocolId, request_responses::ProtocolConfig}; @@ -382,42 +382,44 @@ where .events .push_back(BehaviourOut::JustificationImport(origin, hash, nb, justification)), CustomMessageOutcome::BlockRequest { target, request, pending_response } => { - let mut buf = Vec::with_capacity(request.encoded_len()); - if let Err(err) = request.encode(&mut buf) { - log::warn!( - target: "sync", - "Failed to encode block request {:?}: {:?}", - request, err - ); - return + match self.substrate.encode_block_request(&request) { + Ok(data) => { + self.request_responses.send_request( + &target, + &self.block_request_protocol_name, + data, + pending_response, + IfDisconnected::ImmediateError, + ); + }, + Err(err) => { + log::warn!( + target: "sync", + "Failed to encode block request {:?}: {:?}", + request, err + ); + }, } - - self.request_responses.send_request( - &target, - &self.block_request_protocol_name, - buf, - pending_response, - IfDisconnected::ImmediateError, - ); }, CustomMessageOutcome::StateRequest { target, request, pending_response } => { - let mut buf = Vec::with_capacity(request.encoded_len()); - if let Err(err) = request.encode(&mut buf) { - log::warn!( - target: "sync", - "Failed to encode state request {:?}: {:?}", - request, err - ); - return + match self.substrate.encode_state_request(&request) { + Ok(data) => { + self.request_responses.send_request( + &target, + &self.state_request_protocol_name, + data, + pending_response, + IfDisconnected::ImmediateError, + ); + }, + Err(err) => { + log::warn!( + target: "sync", + "Failed to encode state request {:?}: {:?}", + request, err + ); + }, } - - self.request_responses.send_request( - &target, - &self.state_request_protocol_name, - buf, - pending_response, - IfDisconnected::ImmediateError, - ); }, CustomMessageOutcome::WarpSyncRequest { target, request, pending_response } => match &self.warp_sync_protocol_name { diff --git a/substrate/client/network/src/bitswap.rs b/substrate/client/network/src/bitswap.rs index d5039faaca1..2dab45adc56 100644 --- a/substrate/client/network/src/bitswap.rs +++ b/substrate/client/network/src/bitswap.rs @@ -1,4 +1,4 @@ -// Copyright 2021 Parity Technologies (UK) Ltd. +// Copyright 2022 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify @@ -118,8 +118,7 @@ where fn upgrade_outbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { Box::pin(async move { - let mut data = Vec::with_capacity(self.encoded_len()); - self.encode(&mut data)?; + let data = self.encode_to_vec(); upgrade::write_length_prefixed(&mut socket, data).await }) } diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs index e44977e5be6..430efd697a1 100644 --- a/substrate/client/network/src/config.rs +++ b/substrate/client/network/src/config.rs @@ -26,16 +26,11 @@ pub use sc_network_common::{ request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, + sync::warp::WarpSyncProvider, }; -pub use sc_network_sync::warp_request_handler::WarpSyncProvider; pub use libp2p::{build_multiaddr, core::PublicKey, identity}; -// Note: this re-export shouldn't be part of the public API of the crate and will be removed in -// the future. -#[doc(hidden)] -pub use crate::protocol::ProtocolConfig; - use crate::ExHashT; use core::{fmt, iter}; @@ -46,7 +41,7 @@ use libp2p::{ }; use prometheus_endpoint::Registry; use sc_consensus::ImportQueue; -use sp_consensus::block_validation::BlockAnnounceValidator; +use sc_network_common::sync::ChainSync; use sp_runtime::traits::Block as BlockT; use std::{ borrow::Cow, @@ -101,8 +96,14 @@ where /// valid. pub import_queue: Box<dyn ImportQueue<B>>, - /// Type to check incoming block announcements. - pub block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>, + /// Factory function that creates a new instance of chain sync. + pub create_chain_sync: Box< + dyn FnOnce( + sc_network_common::sync::SyncMode, + Arc<Client>, + Option<Arc<dyn WarpSyncProvider<B>>>, + ) -> crate::error::Result<Box<dyn ChainSync<B>>>, + >, /// Registry for recording prometheus metrics to. pub metrics_registry: Option<Registry>, @@ -114,26 +115,26 @@ where /// block requests, if enabled. /// /// Can be constructed either via - /// [`sc_network_sync::block_request_handler::generate_protocol_config`] allowing outgoing but - /// not incoming requests, or constructed via [`sc_network_sync::block_request_handler:: - /// BlockRequestHandler::new`] allowing both outgoing and incoming requests. + /// `sc_network_sync::block_request_handler::generate_protocol_config` allowing outgoing but + /// not incoming requests, or constructed via `sc_network_sync::block_request_handler:: + /// BlockRequestHandler::new` allowing both outgoing and incoming requests. pub block_request_protocol_config: RequestResponseConfig, /// Request response configuration for the light client request protocol. /// /// Can be constructed either via - /// [`sc_network_light::light_client_requests::generate_protocol_config`] allowing outgoing but + /// `sc_network_light::light_client_requests::generate_protocol_config` allowing outgoing but /// not incoming requests, or constructed via - /// [`sc_network_light::light_client_requests::handler::LightClientRequestHandler::new`] + /// `sc_network_light::light_client_requests::handler::LightClientRequestHandler::new` /// allowing both outgoing and incoming requests. pub light_client_request_protocol_config: RequestResponseConfig, /// Request response configuration for the state request protocol. /// /// Can be constructed either via - /// [`sc_network_sync::block_request_handler::generate_protocol_config`] allowing outgoing but + /// `sc_network_sync::state_request_handler::generate_protocol_config` allowing outgoing but /// not incoming requests, or constructed via - /// [`crate::state_request_handler::StateRequestHandler::new`] allowing + /// `sc_network_sync::state_request_handler::StateRequestHandler::new` allowing /// both outgoing and incoming requests. pub state_request_protocol_config: RequestResponseConfig, diff --git a/substrate/client/network/src/lib.rs b/substrate/client/network/src/lib.rs index fff30550eb8..83bc1075b8b 100644 --- a/substrate/client/network/src/lib.rs +++ b/substrate/client/network/src/lib.rs @@ -266,13 +266,9 @@ pub use protocol::{ event::{DhtEvent, Event, ObservedRole}, PeerInfo, }; -pub use sc_network_light::light_client_requests; -pub use sc_network_sync::{ - block_request_handler, - state::StateDownloadProgress, - state_request_handler, +pub use sc_network_common::sync::{ warp::{WarpSyncPhase, WarpSyncProgress}, - warp_request_handler, SyncState, + StateDownloadProgress, SyncState, }; pub use service::{ DecodingError, IfDisconnected, KademliaKey, Keypair, NetworkService, NetworkWorker, diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 348d2d0bf88..3698a6b936e 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -20,7 +20,6 @@ use crate::{ config, error, request_responses::RequestFailure, utils::{interval, LruHashSet}, - warp_request_handler::{EncodedProof, WarpSyncProvider}, }; use bytes::Bytes; @@ -42,21 +41,23 @@ use message::{ }; use notifications::{Notifications, NotificationsOut}; use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64}; -use prost::Message as _; use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider}; use sc_consensus::import_queue::{BlockImportError, BlockImportStatus, IncomingBlock, Origin}; -use sc_network_common::config::ProtocolId; -use sc_network_sync::{ - message::{ - BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, BlockState, - FromBlock, +use sc_network_common::{ + config::ProtocolId, + sync::{ + message::{ + BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, BlockState, + }, + warp::{EncodedProof, WarpProofRequest}, + BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest, + OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PollBlockAnnounceValidation, + SyncStatus, }, - schema::v1::StateResponse, - BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData, - PollBlockAnnounceValidation, Status as SyncStatus, }; use sp_arithmetic::traits::SaturatedConversion; -use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin}; +use sp_blockchain::HeaderMetadata; +use sp_consensus::BlockOrigin; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, Zero}, @@ -79,7 +80,6 @@ pub mod event; pub mod message; pub use notifications::{NotificationsSink, NotifsHandlerError, Ready}; -use sp_blockchain::HeaderMetadata; /// Interval at which we perform time based maintenance const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100); @@ -167,11 +167,12 @@ pub struct Protocol<B: BlockT, Client> { tick_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>, /// Pending list of messages to return from `poll` as a priority. pending_messages: VecDeque<CustomMessageOutcome<B>>, - config: ProtocolConfig, + /// Assigned roles. + roles: Roles, genesis_hash: B::Hash, /// State machine that handles the list of in-progress requests. Only full node peers are /// registered. - sync: ChainSync<B, Client>, + chain_sync: Box<dyn ChainSync<B>>, // All connected peers. Contains both full and light node peers. peers: HashMap<PeerId, Peer<B>>, chain: Arc<Client>, @@ -231,38 +232,6 @@ pub struct PeerInfo<B: BlockT> { pub best_number: <B::Header as HeaderT>::Number, } -/// Configuration for the Substrate-specific part of the networking layer. -#[derive(Clone)] -pub struct ProtocolConfig { - /// Assigned roles. - pub roles: Roles, - /// Maximum number of peers to ask the same blocks in parallel. - pub max_parallel_downloads: u32, - /// Enable state sync. - pub sync_mode: config::SyncMode, -} - -impl ProtocolConfig { - fn sync_mode(&self) -> sc_network_sync::SyncMode { - if self.roles.is_light() { - sc_network_sync::SyncMode::Light - } else { - match self.sync_mode { - config::SyncMode::Full => sc_network_sync::SyncMode::Full, - config::SyncMode::Fast { skip_proofs, storage_chain_mode } => - sc_network_sync::SyncMode::LightState { skip_proofs, storage_chain_mode }, - config::SyncMode::Warp => sc_network_sync::SyncMode::Warp, - } - } - } -} - -impl Default for ProtocolConfig { - fn default() -> ProtocolConfig { - Self { roles: Roles::FULL, max_parallel_downloads: 5, sync_mode: config::SyncMode::Full } - } -} - /// Handshake sent when we open a block announces substream. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] struct BlockAnnouncesHandshake<B: BlockT> { @@ -278,12 +247,12 @@ struct BlockAnnouncesHandshake<B: BlockT> { impl<B: BlockT> BlockAnnouncesHandshake<B> { fn build( - protocol_config: &ProtocolConfig, + roles: Roles, best_number: NumberFor<B>, best_hash: B::Hash, genesis_hash: B::Hash, ) -> Self { - Self { genesis_hash, roles: protocol_config.roles, best_number, best_hash } + Self { genesis_hash, roles, best_number, best_hash } } } @@ -300,24 +269,15 @@ where { /// Create a new instance. pub fn new( - config: ProtocolConfig, + roles: Roles, chain: Arc<Client>, protocol_id: ProtocolId, network_config: &config::NetworkConfiguration, notifications_protocols_handshakes: Vec<Vec<u8>>, - block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>, metrics_registry: Option<&Registry>, - warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>, - ) -> error::Result<(Protocol<B, Client>, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { + chain_sync: Box<dyn ChainSync<B>>, + ) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let info = chain.info(); - let sync = ChainSync::new( - config.sync_mode(), - chain.clone(), - block_announce_validator, - config.max_parallel_downloads, - warp_sync_provider, - ) - .map_err(Box::new)?; let boot_node_ids = { let mut list = HashSet::new(); @@ -405,7 +365,7 @@ where let genesis_hash = info.genesis_hash; let block_announces_handshake = - BlockAnnouncesHandshake::<B>::build(&config, best_number, best_hash, genesis_hash) + BlockAnnouncesHandshake::<B>::build(roles, best_number, best_hash, genesis_hash) .encode(); let sync_protocol_config = notifications::ProtocolConfig { @@ -438,11 +398,11 @@ where let protocol = Self { tick_timeout: Box::pin(interval(TICK_TIMEOUT)), pending_messages: VecDeque::new(), - config, + roles, peers: HashMap::new(), chain, genesis_hash: info.genesis_hash, - sync, + chain_sync, important_peers, default_peers_set_num_full: network_config.default_peers_set_num_full as usize, default_peers_set_num_light: { @@ -510,49 +470,49 @@ where /// Current global sync state. pub fn sync_state(&self) -> SyncStatus<B> { - self.sync.status() + self.chain_sync.status() } /// Target sync block number. pub fn best_seen_block(&self) -> Option<NumberFor<B>> { - self.sync.status().best_seen_block + self.chain_sync.status().best_seen_block } /// Number of peers participating in syncing. pub fn num_sync_peers(&self) -> u32 { - self.sync.status().num_peers + self.chain_sync.status().num_peers } /// Number of blocks in the import queue. pub fn num_queued_blocks(&self) -> u32 { - self.sync.status().queued_blocks + self.chain_sync.status().queued_blocks } /// Number of downloaded blocks. pub fn num_downloaded_blocks(&self) -> usize { - self.sync.num_downloaded_blocks() + self.chain_sync.num_downloaded_blocks() } /// Number of active sync requests. pub fn num_sync_requests(&self) -> usize { - self.sync.num_sync_requests() + self.chain_sync.num_sync_requests() } /// Inform sync about new best imported block. pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor<B>) { debug!(target: "sync", "New best block imported {:?}/#{}", hash, number); - self.sync.update_chain_info(&hash, number); + self.chain_sync.update_chain_info(&hash, number); self.behaviour.set_notif_protocol_handshake( HARDCODED_PEERSETS_SYNC, - BlockAnnouncesHandshake::<B>::build(&self.config, number, hash, self.genesis_hash) + BlockAnnouncesHandshake::<B>::build(self.roles, number, hash, self.genesis_hash) .encode(), ); } fn update_peer_info(&mut self, who: &PeerId) { - if let Some(info) = self.sync.peer_info(who) { + if let Some(info) = self.chain_sync.peer_info(who) { if let Some(ref mut peer) = self.peers.get_mut(who) { peer.info.best_hash = info.best_hash; peer.info.best_number = info.best_number; @@ -565,14 +525,6 @@ where self.peers.iter().map(|(id, peer)| (id, &peer.info)) } - fn prepare_block_request( - &mut self, - who: PeerId, - request: BlockRequest<B>, - ) -> CustomMessageOutcome<B> { - prepare_block_request::<B>(&mut self.peers, who, request) - } - /// Called by peer when it is disconnecting. /// /// Returns a result if the handshake of this peer was indeed accepted. @@ -584,7 +536,9 @@ where } if let Some(_peer_data) = self.peers.remove(&peer) { - if let Some(OnBlockData::Import(origin, blocks)) = self.sync.peer_disconnected(&peer) { + if let Some(OnBlockData::Import(origin, blocks)) = + self.chain_sync.peer_disconnected(&peer) + { self.pending_messages .push_back(CustomMessageOutcome::BlockImport(origin, blocks)); } @@ -605,62 +559,9 @@ where &mut self, peer_id: PeerId, request: BlockRequest<B>, - response: sc_network_sync::schema::v1::BlockResponse, + response: OpaqueBlockResponse, ) -> CustomMessageOutcome<B> { - let blocks = response - .blocks - .into_iter() - .map(|block_data| { - Ok(BlockData::<B> { - hash: Decode::decode(&mut block_data.hash.as_ref())?, - header: if !block_data.header.is_empty() { - Some(Decode::decode(&mut block_data.header.as_ref())?) - } else { - None - }, - body: if request.fields.contains(BlockAttributes::BODY) { - Some( - block_data - .body - .iter() - .map(|body| Decode::decode(&mut body.as_ref())) - .collect::<Result<Vec<_>, _>>()?, - ) - } else { - None - }, - indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) { - Some(block_data.indexed_body) - } else { - None - }, - receipt: if !block_data.receipt.is_empty() { - Some(block_data.receipt) - } else { - None - }, - message_queue: if !block_data.message_queue.is_empty() { - Some(block_data.message_queue) - } else { - None - }, - justification: if !block_data.justification.is_empty() { - Some(block_data.justification) - } else if block_data.is_empty_justification { - Some(Vec::new()) - } else { - None - }, - justifications: if !block_data.justifications.is_empty() { - Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?) - } else { - None - }, - }) - }) - .collect::<Result<Vec<_>, codec::Error>>(); - - let blocks = match blocks { + let blocks = match self.chain_sync.block_response_into_blocks(&request, response) { Ok(blocks) => blocks, Err(err) => { debug!(target: "sync", "Failed to decode block response from {}: {}", peer_id, err); @@ -690,7 +591,7 @@ where ); if request.fields == BlockAttributes::JUSTIFICATION { - match self.sync.on_block_justification(peer_id, block_response) { + match self.chain_sync.on_block_justification(peer_id, block_response) { Ok(OnBlockJustification::Nothing) => CustomMessageOutcome::None, Ok(OnBlockJustification::Import { peer, hash, number, justifications }) => CustomMessageOutcome::JustificationImport(peer, hash, number, justifications), @@ -701,10 +602,11 @@ where }, } } else { - match self.sync.on_block_data(&peer_id, Some(request), block_response) { + match self.chain_sync.on_block_data(&peer_id, Some(request), block_response) { Ok(OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), - Ok(OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req), + Ok(OnBlockData::Request(peer, req)) => + prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req), Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu); @@ -719,9 +621,9 @@ where pub fn on_state_response( &mut self, peer_id: PeerId, - response: StateResponse, + response: OpaqueStateResponse, ) -> CustomMessageOutcome<B> { - match self.sync.on_state_data(&peer_id, response) { + match self.chain_sync.on_state_data(&peer_id, response) { Ok(OnStateData::Import(origin, block)) => CustomMessageOutcome::BlockImport(origin, vec![block]), Ok(OnStateData::Continue) => CustomMessageOutcome::None, @@ -738,9 +640,9 @@ where pub fn on_warp_sync_response( &mut self, peer_id: PeerId, - response: crate::warp_request_handler::EncodedProof, + response: EncodedProof, ) -> CustomMessageOutcome<B> { - match self.sync.on_warp_sync_data(&peer_id, response) { + match self.chain_sync.on_warp_sync_data(&peer_id, response) { Ok(()) => CustomMessageOutcome::None, Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); @@ -798,7 +700,7 @@ where return Err(()) } - if self.config.roles.is_light() { + if self.roles.is_light() { // we're not interested in light peers if status.roles.is_light() { debug!(target: "sync", "Peer {} is unable to serve light requests", who); @@ -821,14 +723,15 @@ where } } - if status.roles.is_full() && self.sync.num_peers() >= self.default_peers_set_num_full { + if status.roles.is_full() && self.chain_sync.num_peers() >= self.default_peers_set_num_full + { debug!(target: "sync", "Too many full nodes, rejecting {}", who); self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); return Err(()) } if status.roles.is_light() && - (self.peers.len() - self.sync.num_peers()) >= self.default_peers_set_num_light + (self.peers.len() - self.chain_sync.num_peers()) >= self.default_peers_set_num_light { // Make sure that not all slots are occupied by light clients. debug!(target: "sync", "Too many light nodes, rejecting {}", who); @@ -849,7 +752,7 @@ where }; let req = if peer.info.roles.is_full() { - match self.sync.new_peer(who, peer.info.best_hash, peer.info.best_number) { + match self.chain_sync.new_peer(who, peer.info.best_hash, peer.info.best_number) { Ok(req) => req, Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); @@ -868,8 +771,12 @@ where .push_back(CustomMessageOutcome::PeerNewBest(who, status.best_number)); if let Some(req) = req { - let event = self.prepare_block_request(who, req); - self.pending_messages.push_back(event); + self.pending_messages.push_back(prepare_block_request( + self.chain_sync.as_ref(), + &mut self.peers, + who, + req, + )); } Ok(()) @@ -953,7 +860,7 @@ where }; if peer.info.roles.is_full() { - self.sync.push_block_announce_validation(who, hash, announce, is_best); + self.chain_sync.push_block_announce_validation(who, hash, announce, is_best); } } @@ -1010,7 +917,7 @@ where // to import header from announced block let's construct response to request that normally // would have been sent over network (but it is not in our case) - let blocks_to_import = self.sync.on_block_data( + let blocks_to_import = self.chain_sync.on_block_data( &who, None, BlockResponse::<B> { @@ -1035,7 +942,8 @@ where match blocks_to_import { Ok(OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), - Ok(OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req), + Ok(OnBlockData::Request(peer, req)) => + prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req), Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu); @@ -1047,7 +955,7 @@ where /// Call this when a block has been finalized. The sync layer may have some additional /// requesting to perform. pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { - self.sync.on_block_finalized(&hash, *header.number()) + self.chain_sync.on_block_finalized(&hash, *header.number()) } /// Request a justification for the given block. @@ -1055,12 +963,12 @@ where /// Uses `protocol` to queue a new justification request and tries to dispatch all pending /// requests. pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) { - self.sync.request_justification(hash, number) + self.chain_sync.request_justification(hash, number) } /// Clear all pending justification requests. pub fn clear_justification_requests(&mut self) { - self.sync.clear_justification_requests(); + self.chain_sync.clear_justification_requests(); } /// Request syncing for the given block from given set of peers. @@ -1072,7 +980,7 @@ where hash: &B::Hash, number: NumberFor<B>, ) { - self.sync.set_sync_fork_request(peers, hash, number) + self.chain_sync.set_sync_fork_request(peers, hash, number) } /// A batch of blocks have been processed, with or without errors. @@ -1084,11 +992,12 @@ where count: usize, results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>, ) { - let results = self.sync.on_blocks_processed(imported, count, results); + let results = self.chain_sync.on_blocks_processed(imported, count, results); for result in results { match result { Ok((id, req)) => { self.pending_messages.push_back(prepare_block_request( + self.chain_sync.as_ref(), &mut self.peers, id, req, @@ -1111,7 +1020,7 @@ where number: NumberFor<B>, success: bool, ) { - self.sync.on_justification_import(hash, number, success); + self.chain_sync.on_justification_import(hash, number, success); if !success { info!("💔 Invalid justification provided by {} for #{}", who, hash); self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); @@ -1228,12 +1137,22 @@ where } } + /// Encode implementation-specific block request. + pub fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result<Vec<u8>, String> { + self.chain_sync.encode_block_request(request) + } + + /// Encode implementation-specific state request. + pub fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String> { + self.chain_sync.encode_state_request(request) + } + fn report_metrics(&self) { if let Some(metrics) = &self.metrics { let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX); metrics.peers.set(n); - let m = self.sync.metrics(); + let m = self.chain_sync.metrics(); metrics.fork_targets.set(m.fork_targets.into()); metrics.queued_blocks.set(m.queued_blocks.into()); @@ -1259,6 +1178,7 @@ where } fn prepare_block_request<B: BlockT>( + chain_sync: &dyn ChainSync<B>, peers: &mut HashMap<PeerId, Peer<B>>, who: PeerId, request: BlockRequest<B>, @@ -1269,19 +1189,7 @@ fn prepare_block_request<B: BlockT>( peer.request = Some((PeerRequest::Block(request.clone()), rx)); } - let request = sc_network_sync::schema::v1::BlockRequest { - fields: request.fields.to_be_u32(), - from_block: match request.from { - FromBlock::Hash(h) => - Some(sc_network_sync::schema::v1::block_request::FromBlock::Hash(h.encode())), - FromBlock::Number(n) => - Some(sc_network_sync::schema::v1::block_request::FromBlock::Number(n.encode())), - }, - to_block: request.to.map(|h| h.encode()).unwrap_or_default(), - direction: request.direction as i32, - max_blocks: request.max.unwrap_or(0), - support_multiple_justifications: true, - }; + let request = chain_sync.create_opaque_block_request(&request); CustomMessageOutcome::BlockRequest { target: who, request, pending_response: tx } } @@ -1289,7 +1197,7 @@ fn prepare_block_request<B: BlockT>( fn prepare_state_request<B: BlockT>( peers: &mut HashMap<PeerId, Peer<B>>, who: PeerId, - request: sc_network_sync::schema::v1::StateRequest, + request: OpaqueStateRequest, ) -> CustomMessageOutcome<B> { let (tx, rx) = oneshot::channel(); @@ -1302,7 +1210,7 @@ fn prepare_state_request<B: BlockT>( fn prepare_warp_sync_request<B: BlockT>( peers: &mut HashMap<PeerId, Peer<B>>, who: PeerId, - request: crate::warp_request_handler::Request<B>, + request: WarpProofRequest<B>, ) -> CustomMessageOutcome<B> { let (tx, rx) = oneshot::channel(); @@ -1346,19 +1254,19 @@ pub enum CustomMessageOutcome<B: BlockT> { /// A new block request must be emitted. BlockRequest { target: PeerId, - request: sc_network_sync::schema::v1::BlockRequest, + request: OpaqueBlockRequest, pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>, }, /// A new storage request must be emitted. StateRequest { target: PeerId, - request: sc_network_sync::schema::v1::StateRequest, + request: OpaqueStateRequest, pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>, }, /// A new warp sync request must be emitted. WarpSyncRequest { target: PeerId, - request: crate::warp_request_handler::Request<B>, + request: WarpProofRequest<B>, pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>, }, /// Peer has a reported a new head of chain. @@ -1455,10 +1363,8 @@ where let (req, _) = peer.request.take().unwrap(); match req { PeerRequest::Block(req) => { - let protobuf_response = - match sc_network_sync::schema::v1::BlockResponse::decode( - &resp[..], - ) { + let response = + match self.chain_sync.decode_block_response(&resp[..]) { Ok(proto) => proto, Err(e) => { debug!( @@ -1474,13 +1380,11 @@ where }, }; - finished_block_requests.push((*id, req, protobuf_response)); + finished_block_requests.push((*id, req, response)); }, PeerRequest::State => { - let protobuf_response = - match sc_network_sync::schema::v1::StateResponse::decode( - &resp[..], - ) { + let response = + match self.chain_sync.decode_state_response(&resp[..]) { Ok(proto) => proto, Err(e) => { debug!( @@ -1496,7 +1400,7 @@ where }, }; - finished_state_requests.push((*id, protobuf_response)); + finished_state_requests.push((*id, response)); }, PeerRequest::WarpProof => { finished_warp_sync_requests.push((*id, resp)); @@ -1555,12 +1459,12 @@ where } } } - for (id, req, protobuf_response) in finished_block_requests { - let ev = self.on_block_response(id, req, protobuf_response); + for (id, req, response) in finished_block_requests { + let ev = self.on_block_response(id, req, response); self.pending_messages.push_back(ev); } - for (id, protobuf_response) in finished_state_requests { - let ev = self.on_state_response(id, protobuf_response); + for (id, response) in finished_state_requests { + let ev = self.on_state_response(id, response); self.pending_messages.push_back(ev); } for (id, response) in finished_warp_sync_requests { @@ -1572,25 +1476,32 @@ where self.tick(); } - for (id, request) in self.sync.block_requests() { - let event = prepare_block_request(&mut self.peers, *id, request); + for (id, request) in self + .chain_sync + .block_requests() + .map(|(peer_id, request)| (*peer_id, request)) + .collect::<Vec<_>>() + { + let event = + prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, id, request); self.pending_messages.push_back(event); } - if let Some((id, request)) = self.sync.state_request() { + if let Some((id, request)) = self.chain_sync.state_request() { let event = prepare_state_request(&mut self.peers, id, request); self.pending_messages.push_back(event); } - for (id, request) in self.sync.justification_requests() { - let event = prepare_block_request(&mut self.peers, id, request); + for (id, request) in self.chain_sync.justification_requests().collect::<Vec<_>>() { + let event = + prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, id, request); self.pending_messages.push_back(event); } - if let Some((id, request)) = self.sync.warp_sync_request() { + if let Some((id, request)) = self.chain_sync.warp_sync_request() { let event = prepare_warp_sync_request(&mut self.peers, id, request); self.pending_messages.push_back(event); } // Check if there is any block announcement validation finished. - while let Poll::Ready(result) = self.sync.poll_block_announce_validation(cx) { + while let Poll::Ready(result) = self.chain_sync.poll_block_announce_validation(cx) { match self.process_block_announce_validation_result(result) { CustomMessageOutcome::None => {}, outcome => self.pending_messages.push_back(outcome), @@ -1771,7 +1682,8 @@ where // Make sure that the newly added block announce validation future was // polled once to be registered in the task. - if let Poll::Ready(res) = self.sync.poll_block_announce_validation(cx) { + if let Poll::Ready(res) = self.chain_sync.poll_block_announce_validation(cx) + { self.process_block_announce_validation_result(res) } else { CustomMessageOutcome::None diff --git a/substrate/client/network/src/protocol/message.rs b/substrate/client/network/src/protocol/message.rs index a57740ec274..c9512f82e23 100644 --- a/substrate/client/network/src/protocol/message.rs +++ b/substrate/client/network/src/protocol/message.rs @@ -63,10 +63,12 @@ pub mod generic { use bitflags::bitflags; use codec::{Decode, Encode, Input, Output}; use sc_client_api::StorageProof; - use sc_network_common::message::RequestId; - use sc_network_sync::message::{ - generic::{BlockRequest, BlockResponse}, - BlockAnnounce, + use sc_network_common::{ + message::RequestId, + sync::message::{ + generic::{BlockRequest, BlockResponse}, + BlockAnnounce, + }, }; use sp_runtime::ConsensusEngineId; diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 1cc717a50d0..ef7ef2f5a2d 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -30,7 +30,7 @@ use crate::{ behaviour::{self, Behaviour, BehaviourOut}, bitswap::Bitswap, - config::{parse_str_addr, Params, TransportConfig}, + config::{self, parse_str_addr, Params, TransportConfig}, discovery::DiscoveryConfig, error::Error, network_state::{ @@ -60,7 +60,7 @@ use metrics::{Histogram, HistogramVec, MetricSources, Metrics}; use parking_lot::Mutex; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link}; -use sc_network_sync::{Status as SyncStatus, SyncState}; +use sc_network_common::sync::{SyncMode, SyncState, SyncStatus}; use sc_peerset::PeersetHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::{HeaderBackend, HeaderMetadata}; @@ -207,13 +207,23 @@ where None => (None, None), }; - let (protocol, peerset_handle, mut known_addresses) = Protocol::new( - protocol::ProtocolConfig { - roles: From::from(¶ms.role), - max_parallel_downloads: params.network_config.max_parallel_downloads, - sync_mode: params.network_config.sync_mode.clone(), + let chain_sync = (params.create_chain_sync)( + if params.role.is_light() { + SyncMode::Light + } else { + match params.network_config.sync_mode { + config::SyncMode::Full => SyncMode::Full, + config::SyncMode::Fast { skip_proofs, storage_chain_mode } => + SyncMode::LightState { skip_proofs, storage_chain_mode }, + config::SyncMode::Warp => SyncMode::Warp, + } }, params.chain.clone(), + warp_sync_provider, + )?; + let (protocol, peerset_handle, mut known_addresses) = Protocol::new( + From::from(¶ms.role), + params.chain.clone(), params.protocol_id.clone(), ¶ms.network_config, iter::once(Vec::new()) @@ -222,9 +232,8 @@ where .map(|_| default_notif_handshake_message.clone()), ) .collect(), - params.block_announce_validator, params.metrics_registry.as_ref(), - warp_sync_provider, + chain_sync, )?; // List of multiaddresses that we know in the network. diff --git a/substrate/client/network/src/service/tests.rs b/substrate/client/network/src/service/tests.rs index 808546d67fc..181d58130aa 100644 --- a/substrate/client/network/src/service/tests.rs +++ b/substrate/client/network/src/service/tests.rs @@ -16,15 +16,17 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use crate::{ - config, state_request_handler::StateRequestHandler, Event, NetworkService, NetworkWorker, -}; +use crate::{config, Event, NetworkService, NetworkWorker}; use futures::prelude::*; use libp2p::PeerId; use sc_network_common::config::ProtocolId; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; -use sc_network_sync::block_request_handler::BlockRequestHandler; +use sc_network_sync::{ + block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, + ChainSync, +}; +use sp_consensus::block_validation::DefaultBlockAnnounceValidator; use sp_runtime::traits::{Block as BlockT, Header as _}; use std::{borrow::Cow, sync::Arc, time::Duration}; use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; @@ -109,6 +111,7 @@ fn build_test_full_node( protocol_config }; + let max_parallel_downloads = config.max_parallel_downloads; let worker = NetworkWorker::new(config::Params { role: config::Role::Full, executor: None, @@ -120,8 +123,17 @@ fn build_test_full_node( transaction_pool: Arc::new(crate::config::EmptyTransactionPool), protocol_id, import_queue, - block_announce_validator: Box::new( - sp_consensus::block_validation::DefaultBlockAnnounceValidator, + create_chain_sync: Box::new( + move |sync_mode, chain, warp_sync_provider| match ChainSync::new( + sync_mode, + chain, + Box::new(DefaultBlockAnnounceValidator), + max_parallel_downloads, + warp_sync_provider, + ) { + Ok(chain_sync) => Ok(Box::new(chain_sync)), + Err(error) => Err(Box::new(error).into()), + }, ), metrics_registry: None, block_request_protocol_config, diff --git a/substrate/client/network/sync/Cargo.toml b/substrate/client/network/sync/Cargo.toml index a61f780fd56..3e352614640 100644 --- a/substrate/client/network/sync/Cargo.toml +++ b/substrate/client/network/sync/Cargo.toml @@ -17,11 +17,9 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.10" [dependencies] -bitflags = "1.3.2" codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive", ] } -either = "1.5.3" futures = "0.3.21" libp2p = "0.46.1" log = "0.4.17" diff --git a/substrate/client/network/sync/src/block_request_handler.rs b/substrate/client/network/sync/src/block_request_handler.rs index 4cd69d1fac4..2a847a8bf36 100644 --- a/substrate/client/network/sync/src/block_request_handler.rs +++ b/substrate/client/network/sync/src/block_request_handler.rs @@ -17,10 +17,7 @@ //! Helper for handling (i.e. answering) block requests from a remote peer via the //! `crate::request_responses::RequestResponsesBehaviour`. -use crate::{ - message::BlockAttributes, - schema::v1::{block_request::FromBlock, BlockResponse, Direction}, -}; +use crate::schema::v1::{block_request::FromBlock, BlockResponse, Direction}; use codec::{Decode, Encode}; use futures::{ channel::{mpsc, oneshot}, @@ -34,6 +31,7 @@ use sc_client_api::BlockBackend; use sc_network_common::{ config::ProtocolId, request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, + sync::message::BlockAttributes, }; use sp_blockchain::HeaderBackend; use sp_runtime::{ diff --git a/substrate/client/network/sync/src/blocks.rs b/substrate/client/network/sync/src/blocks.rs index 26753f120a1..5fb14846750 100644 --- a/substrate/client/network/sync/src/blocks.rs +++ b/substrate/client/network/sync/src/blocks.rs @@ -16,9 +16,9 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use crate::message; use libp2p::PeerId; use log::trace; +use sc_network_common::sync::message; use sp_runtime::traits::{Block as BlockT, NumberFor, One}; use std::{ cmp, @@ -245,8 +245,8 @@ impl<B: BlockT> BlockCollection<B> { #[cfg(test)] mod test { use super::{BlockCollection, BlockData, BlockRangeState}; - use crate::message; use libp2p::PeerId; + use sc_network_common::sync::message; use sp_core::H256; use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper}; diff --git a/substrate/client/network/sync/src/extra_requests.rs b/substrate/client/network/sync/src/extra_requests.rs index c684d8e7278..6206f8a61bc 100644 --- a/substrate/client/network/sync/src/extra_requests.rs +++ b/substrate/client/network/sync/src/extra_requests.rs @@ -20,6 +20,7 @@ use crate::{PeerSync, PeerSyncState}; use fork_tree::ForkTree; use libp2p::PeerId; use log::{debug, trace, warn}; +use sc_network_common::sync::metrics::Metrics; use sp_blockchain::Error as ClientError; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; use std::{ @@ -56,15 +57,6 @@ pub(crate) struct ExtraRequests<B: BlockT> { request_type_name: &'static str, } -#[derive(Debug)] -pub struct Metrics { - pub pending_requests: u32, - pub active_requests: u32, - pub importing_requests: u32, - pub failed_requests: u32, - _priv: (), -} - impl<B: BlockT> ExtraRequests<B> { pub(crate) fn new(request_type_name: &'static str) -> Self { Self { @@ -258,7 +250,6 @@ impl<B: BlockT> ExtraRequests<B> { active_requests: self.active_requests.len().try_into().unwrap_or(std::u32::MAX), failed_requests: self.failed_requests.len().try_into().unwrap_or(std::u32::MAX), importing_requests: self.importing_requests.len().try_into().unwrap_or(std::u32::MAX), - _priv: (), } } } diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 1ce69b6dc81..aff773bd12e 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -30,8 +30,7 @@ pub mod block_request_handler; pub mod blocks; -pub mod message; -pub mod schema; +mod schema; pub mod state; pub mod state_request_handler; pub mod warp; @@ -39,22 +38,28 @@ pub mod warp_request_handler; use crate::{ blocks::BlockCollection, - message::{BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse}, schema::v1::{StateRequest, StateResponse}, - state::{StateDownloadProgress, StateSync}, - warp::{ - EncodedProof, WarpProofImportResult, WarpProofRequest, WarpSync, WarpSyncPhase, - WarpSyncProgress, WarpSyncProvider, - }, + state::StateSync, + warp::{WarpProofImportResult, WarpSync}, }; -use codec::Encode; -use either::Either; +use codec::{Decode, DecodeAll, Encode}; use extra_requests::ExtraRequests; use futures::{stream::FuturesUnordered, task::Poll, Future, FutureExt, StreamExt}; use libp2p::PeerId; use log::{debug, error, info, trace, warn}; +use prost::Message; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; +use sc_network_common::sync::{ + message::{ + BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, + FromBlock, + }, + warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, WarpSyncProvider}, + BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, OnStateData, + OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, + PollBlockAnnounceValidation, SyncMode, SyncState, SyncStatus, +}; use sp_arithmetic::traits::Saturating; use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; use sp_consensus::{ @@ -71,7 +76,6 @@ use sp_runtime::{ }; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, - fmt, ops::Range, pin::Pin, sync::Arc, @@ -283,15 +287,6 @@ impl<B: BlockT> PeerSync<B> { } } -/// The sync status of a peer we are trying to sync with -#[derive(Debug)] -pub struct PeerInfo<B: BlockT> { - /// Their best block hash. - pub best_hash: B::Hash, - /// Their best block number. - pub best_number: NumberFor<B>, -} - struct ForkTarget<B: BlockT> { number: NumberFor<B>, parent_hash: Option<B::Hash>, @@ -330,108 +325,6 @@ impl<B: BlockT> PeerSyncState<B> { } } -/// Reported sync state. -#[derive(Clone, Eq, PartialEq, Debug)] -pub enum SyncState { - /// Initial sync is complete, keep-up sync is active. - Idle, - /// Actively catching up with the chain. - Downloading, -} - -/// Syncing status and statistics. -#[derive(Clone)] -pub struct Status<B: BlockT> { - /// Current global sync state. - pub state: SyncState, - /// Target sync block number. - pub best_seen_block: Option<NumberFor<B>>, - /// Number of peers participating in syncing. - pub num_peers: u32, - /// Number of blocks queued for import - pub queued_blocks: u32, - /// State sync status in progress, if any. - pub state_sync: Option<StateDownloadProgress>, - /// Warp sync in progress, if any. - pub warp_sync: Option<WarpSyncProgress<B>>, -} - -/// A peer did not behave as expected and should be reported. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct BadPeer(pub PeerId, pub sc_peerset::ReputationChange); - -impl fmt::Display for BadPeer { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Bad peer {}; Reputation change: {:?}", self.0, self.1) - } -} - -impl std::error::Error for BadPeer {} - -/// Result of [`ChainSync::on_block_data`]. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum OnBlockData<B: BlockT> { - /// The block should be imported. - Import(BlockOrigin, Vec<IncomingBlock<B>>), - /// A new block request needs to be made to the given peer. - Request(PeerId, BlockRequest<B>), -} - -impl<B: BlockT> OnBlockData<B> { - /// Returns `self` as request. - #[cfg(test)] - fn into_request(self) -> Option<(PeerId, BlockRequest<B>)> { - if let Self::Request(peer, req) = self { - Some((peer, req)) - } else { - None - } - } -} - -/// Result of [`ChainSync::on_state_data`]. -#[derive(Debug)] -pub enum OnStateData<B: BlockT> { - /// The block and state that should be imported. - Import(BlockOrigin, IncomingBlock<B>), - /// A new state request needs to be made to the given peer. - Continue, -} - -/// Result of [`ChainSync::poll_block_announce_validation`]. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum PollBlockAnnounceValidation<H> { - /// The announcement failed at validation. - /// - /// The peer reputation should be decreased. - Failure { - /// Who sent the processed block announcement? - who: PeerId, - /// Should the peer be disconnected? - disconnect: bool, - }, - /// The announcement does not require further handling. - Nothing { - /// Who sent the processed block announcement? - who: PeerId, - /// Was this their new best block? - is_best: bool, - /// The announcement. - announce: BlockAnnounce<H>, - }, - /// The announcement header should be imported. - ImportHeader { - /// Who sent the processed block announcement? - who: PeerId, - /// Was this their new best block? - is_best: bool, - /// The announcement. - announce: BlockAnnounce<H>, - }, - /// The block announcement should be skipped. - Skip, -} - /// Result of [`ChainSync::block_announce_validation`]. #[derive(Debug, Clone, PartialEq, Eq)] enum PreValidateBlockAnnounce<H> { @@ -467,28 +360,6 @@ enum PreValidateBlockAnnounce<H> { Skip, } -/// Result of [`ChainSync::on_block_justification`]. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum OnBlockJustification<B: BlockT> { - /// The justification needs no further handling. - Nothing, - /// The justification should be imported. - Import { peer: PeerId, hash: B::Hash, number: NumberFor<B>, justifications: Justifications }, -} - -/// Operation mode. -#[derive(Debug, PartialEq, Eq)] -pub enum SyncMode { - // Sync headers only - Light, - // Sync headers and block bodies - Full, - // Sync headers and the last finalied state - LightState { storage_chain_mode: bool, skip_proofs: bool }, - // Warp sync mode. - Warp, -} - /// Result of [`ChainSync::has_slot_for_block_announce_validation`]. enum HasSlotForBlockAnnounceValidation { /// Yes, there is a slot for the block announce validation. @@ -499,7 +370,7 @@ enum HasSlotForBlockAnnounceValidation { MaximumPeerSlotsReached, } -impl<B, Client> ChainSync<B, Client> +impl<B, Client> ChainSyncT<B> for ChainSync<B, Client> where B: BlockT, Client: HeaderBackend<B> @@ -510,88 +381,14 @@ where + Sync + 'static, { - /// Create a new instance. - pub fn new( - mode: SyncMode, - client: Arc<Client>, - block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>, - max_parallel_downloads: u32, - warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>, - ) -> Result<Self, ClientError> { - let mut sync = Self { - client, - peers: HashMap::new(), - blocks: BlockCollection::new(), - best_queued_hash: Default::default(), - best_queued_number: Zero::zero(), - extra_justifications: ExtraRequests::new("justification"), - mode, - queue_blocks: Default::default(), - fork_targets: Default::default(), - allowed_requests: Default::default(), - block_announce_validator, - max_parallel_downloads, - downloaded_blocks: 0, - block_announce_validation: Default::default(), - block_announce_validation_per_peer_stats: Default::default(), - state_sync: None, - warp_sync: None, - warp_sync_provider, - import_existing: false, - gap_sync: None, - }; - sync.reset_sync_start_point()?; - Ok(sync) - } - - fn required_block_attributes(&self) -> BlockAttributes { - match self.mode { - SyncMode::Full => - BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, - SyncMode::Light => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION, - SyncMode::LightState { storage_chain_mode: false, .. } | SyncMode::Warp => - BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, - SyncMode::LightState { storage_chain_mode: true, .. } => - BlockAttributes::HEADER | - BlockAttributes::JUSTIFICATION | - BlockAttributes::INDEXED_BODY, - } - } - - fn skip_execution(&self) -> bool { - match self.mode { - SyncMode::Full => false, - SyncMode::Light => true, - SyncMode::LightState { .. } => true, - SyncMode::Warp => true, - } - } - - /// Returns the state of the sync of the given peer. - /// - /// Returns `None` if the peer is unknown. - pub fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>> { + fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>> { self.peers .get(who) .map(|p| PeerInfo { best_hash: p.best_hash, best_number: p.best_number }) } - /// Returns the best seen block. - fn best_seen(&self) -> Option<NumberFor<B>> { - let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>(); - - if best_seens.is_empty() { - None - } else { - let middle = best_seens.len() / 2; - - // Not the "perfect median" when we have an even number of peers. - Some(*best_seens.select_nth_unstable(middle).1) - } - } - /// Returns the current sync status. - pub fn status(&self) -> Status<B> { + fn status(&self) -> SyncStatus<B> { let best_seen = self.best_seen(); let sync_state = if let Some(n) = best_seen { // A chain is classified as downloading if the provided best block is @@ -617,7 +414,7 @@ where _ => None, }; - Status { + SyncStatus { state: sync_state, best_seen_block: best_seen, num_peers: self.peers.len() as u32, @@ -627,29 +424,22 @@ where } } - /// Number of active forks requests. This includes - /// requests that are pending or could be issued right away. - pub fn num_sync_requests(&self) -> usize { + fn num_sync_requests(&self) -> usize { self.fork_targets .values() .filter(|f| f.number <= self.best_queued_number) .count() } - /// Number of downloaded blocks. - pub fn num_downloaded_blocks(&self) -> usize { + fn num_downloaded_blocks(&self) -> usize { self.downloaded_blocks } - /// Returns the current number of peers stored within this state machine. - pub fn num_peers(&self) -> usize { + fn num_peers(&self) -> usize { self.peers.len() } - /// Handle a new connected peer. - /// - /// Call this method whenever we connect to a new peer. - pub fn new_peer( + fn new_peer( &mut self, who: PeerId, best_hash: B::Hash, @@ -773,27 +563,22 @@ where } } - /// Signal that a new best block has been imported. - /// `ChainSync` state with that information. - pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) { + fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) { self.on_block_queued(best_hash, best_number); } - /// Schedule a justification request for the given block. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) { + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) { let client = &self.client; self.extra_justifications .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block)) } - /// Clear all pending justification requests. - pub fn clear_justification_requests(&mut self) { + fn clear_justification_requests(&mut self) { self.extra_justifications.reset(); } - /// Request syncing for the given block from given set of peers. // The implementation is similar to on_block_announce with unknown parent hash. - pub fn set_sync_fork_request( + fn set_sync_fork_request( &mut self, mut peers: Vec<PeerId>, hash: &B::Hash, @@ -845,13 +630,12 @@ where .extend(peers); } - /// Get an iterator over all scheduled justification requests. - pub fn justification_requests( + fn justification_requests( &mut self, - ) -> impl Iterator<Item = (PeerId, BlockRequest<B>)> + '_ { + ) -> Box<dyn Iterator<Item = (PeerId, BlockRequest<B>)> + '_> { let peers = &mut self.peers; let mut matcher = self.extra_justifications.matcher(); - std::iter::from_fn(move || { + Box::new(std::iter::from_fn(move || { if let Some((peer, request)) = matcher.next(peers) { peers .get_mut(&peer) @@ -859,33 +643,32 @@ where "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed", ) .state = PeerSyncState::DownloadingJustification(request.0); - let req = message::generic::BlockRequest { + let req = BlockRequest::<B> { id: 0, fields: BlockAttributes::JUSTIFICATION, - from: message::FromBlock::Hash(request.0), + from: FromBlock::Hash(request.0), to: None, - direction: message::Direction::Ascending, + direction: Direction::Ascending, max: Some(1), }; Some((peer, req)) } else { None } - }) + })) } - /// Get an iterator over all block requests of all peers. - pub fn block_requests(&mut self) -> impl Iterator<Item = (&PeerId, BlockRequest<B>)> + '_ { + fn block_requests(&mut self) -> Box<dyn Iterator<Item = (&PeerId, BlockRequest<B>)> + '_> { if self.allowed_requests.is_empty() || self.state_sync.is_some() || self.mode == SyncMode::Warp { - return Either::Left(std::iter::empty()) + return Box::new(std::iter::empty()) } if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { trace!(target: "sync", "Too many blocks in the queue."); - return Either::Left(std::iter::empty()) + return Box::new(std::iter::empty()) } let major_sync = self.status().state == SyncState::Downloading; let attrs = self.required_block_attributes(); @@ -982,11 +765,10 @@ where None } }); - Either::Right(iter) + Box::new(iter) } - /// Get a state request, if any. - pub fn state_request(&mut self) -> Option<(PeerId, StateRequest)> { + fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { if self.allowed_requests.is_empty() { return None } @@ -1007,7 +789,7 @@ where let request = sync.next_request(); trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); self.allowed_requests.clear(); - return Some((*id, request)) + return Some((*id, OpaqueStateRequest(Box::new(request)))) } } } @@ -1023,7 +805,7 @@ where trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); peer.state = PeerSyncState::DownloadingState; self.allowed_requests.clear(); - return Some((*id, request)) + return Some((*id, OpaqueStateRequest(Box::new(request)))) } } } @@ -1031,8 +813,7 @@ where None } - /// Get a warp sync request, if any. - pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> { + fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest<B>)> { if let Some(sync) = &self.warp_sync { if self.allowed_requests.is_empty() || sync.is_complete() || @@ -1063,14 +844,7 @@ where None } - /// Handle a response from the remote to a block request that we made. - /// - /// `request` must be the original request that triggered `response`. - /// or `None` if data comes from the block announcement. - /// - /// If this corresponds to a valid block, this outputs the block that - /// must be imported in the import queue. - pub fn on_block_data( + fn on_block_data( &mut self, who: &PeerId, request: Option<BlockRequest<B>>, @@ -1080,10 +854,7 @@ where let mut gap = false; let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(who) { let mut blocks = response.blocks; - if request - .as_ref() - .map_or(false, |r| r.direction == message::Direction::Descending) - { + if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) { trace!(target: "sync", "Reversing incoming block list"); blocks.reverse() } @@ -1297,14 +1068,20 @@ where Ok(self.validate_and_queue_blocks(new_blocks, gap)) } - /// Handle a response from the remote to a state request that we made. - /// - /// Returns next request if any. - pub fn on_state_data( + fn on_state_data( &mut self, who: &PeerId, - response: StateResponse, + response: OpaqueStateResponse, ) -> Result<OnStateData<B>, BadPeer> { + let response: Box<StateResponse> = response.0.downcast().map_err(|_error| { + error!( + target: "sync", + "Failed to downcast opaque state response, this is an implementation bug." + ); + + BadPeer(*who, rep::BAD_RESPONSE) + })?; + if let Some(peer) = self.peers.get_mut(who) { if let PeerSyncState::DownloadingState = peer.state { peer.state = PeerSyncState::Available; @@ -1319,7 +1096,7 @@ where response.entries.len(), response.proof.len(), ); - sync.import(response) + sync.import(*response) } else if let Some(sync) = &mut self.warp_sync { debug!( target: "sync", @@ -1328,7 +1105,7 @@ where response.entries.len(), response.proof.len(), ); - sync.import_state(response) + sync.import_state(*response) } else { debug!(target: "sync", "Ignored obsolete state response from {}", who); return Err(BadPeer(*who, rep::NOT_REQUESTED)) @@ -1360,14 +1137,7 @@ where } } - /// Handle a response from the remote to a warp proof request that we made. - /// - /// Returns next request. - pub fn on_warp_sync_data( - &mut self, - who: &PeerId, - response: EncodedProof, - ) -> Result<(), BadPeer> { + fn on_warp_sync_data(&mut self, who: &PeerId, response: EncodedProof) -> Result<(), BadPeer> { if let Some(peer) = self.peers.get_mut(who) { if let PeerSyncState::DownloadingWarpProof = peer.state { peer.state = PeerSyncState::Available; @@ -1396,57 +1166,7 @@ where } } - fn validate_and_queue_blocks( - &mut self, - mut new_blocks: Vec<IncomingBlock<B>>, - gap: bool, - ) -> OnBlockData<B> { - let orig_len = new_blocks.len(); - new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash)); - if new_blocks.len() != orig_len { - debug!( - target: "sync", - "Ignoring {} blocks that are already queued", - orig_len - new_blocks.len(), - ); - } - - let origin = if !gap && self.status().state != SyncState::Downloading { - BlockOrigin::NetworkBroadcast - } else { - BlockOrigin::NetworkInitialSync - }; - - if let Some((h, n)) = new_blocks - .last() - .and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number()))) - { - trace!( - target:"sync", - "Accepted {} blocks ({:?}) with origin {:?}", - new_blocks.len(), - h, - origin, - ); - self.on_block_queued(h, n) - } - self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash)); - OnBlockData::Import(origin, new_blocks) - } - - fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) { - if let Some(peer) = self.peers.get_mut(peer_id) { - peer.update_common_number(new_common); - } - } - - /// Handle a response from the remote to a justification request that we made. - /// - /// `request` must be the original request that triggered `response`. - /// - /// Returns `Some` if this produces a justification that must be imported - /// into the import queue. - pub fn on_block_justification( + fn on_block_justification( &mut self, who: PeerId, response: BlockResponse<B>, @@ -1501,18 +1221,12 @@ where Ok(OnBlockJustification::Nothing) } - /// A batch of blocks have been processed, with or without errors. - /// - /// Call this when a batch of blocks have been processed by the import - /// queue, with or without errors. - /// - /// `peer_info` is passed in case of a restart. - pub fn on_blocks_processed( + fn on_blocks_processed( &mut self, imported: usize, count: usize, results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>, - ) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> { + ) -> Box<dyn Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>>> { trace!(target: "sync", "Imported {} of {}", imported, count); let mut output = Vec::new(); @@ -1654,54 +1368,435 @@ where } self.allowed_requests.set_all(); - output.into_iter() + Box::new(output.into_iter()) + } + + fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) { + let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; + self.extra_justifications + .try_finalize_root((hash, number), finalization_result, true); + self.allowed_requests.set_all(); + } + + fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) { + let client = &self.client; + let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { + is_descendent_of(&**client, base, block) + }); + + if let SyncMode::LightState { skip_proofs, .. } = &self.mode { + if self.state_sync.is_none() && !self.peers.is_empty() && self.queue_blocks.is_empty() { + // Finalized a recent block. + let mut heads: Vec<_> = + self.peers.iter().map(|(_, peer)| peer.best_number).collect(); + heads.sort(); + let median = heads[heads.len() / 2]; + if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { + if let Ok(Some(header)) = self.client.header(BlockId::hash(*hash)) { + log::debug!( + target: "sync", + "Starting state sync for #{} ({})", + number, + hash, + ); + self.state_sync = + Some(StateSync::new(self.client.clone(), header, *skip_proofs)); + self.allowed_requests.set_all(); + } + } + } + } + + if let Err(err) = r { + warn!( + target: "sync", + "💔 Error cleaning up pending extra justification data requests: {}", + err, + ); + } + } + + fn push_block_announce_validation( + &mut self, + who: PeerId, + hash: B::Hash, + announce: BlockAnnounce<B::Header>, + is_best: bool, + ) { + let header = &announce.header; + let number = *header.number(); + debug!( + target: "sync", + "Pre-validating received block announcement {:?} with number {:?} from {}", + hash, + number, + who, + ); + + if number.is_zero() { + self.block_announce_validation.push( + async move { + warn!( + target: "sync", + "💔 Ignored genesis block (#0) announcement from {}: {}", + who, + hash, + ); + PreValidateBlockAnnounce::Skip + } + .boxed(), + ); + return + } + + // Check if there is a slot for this block announce validation. + match self.has_slot_for_block_announce_validation(&who) { + HasSlotForBlockAnnounceValidation::Yes => {}, + HasSlotForBlockAnnounceValidation::TotalMaximumSlotsReached => { + self.block_announce_validation.push( + async move { + warn!( + target: "sync", + "💔 Ignored block (#{} -- {}) announcement from {} because all validation slots are occupied.", + number, + hash, + who, + ); + PreValidateBlockAnnounce::Skip + } + .boxed(), + ); + return + }, + HasSlotForBlockAnnounceValidation::MaximumPeerSlotsReached => { + self.block_announce_validation.push(async move { + warn!( + target: "sync", + "💔 Ignored block (#{} -- {}) announcement from {} because all validation slots for this peer are occupied.", + number, + hash, + who, + ); + PreValidateBlockAnnounce::Skip + }.boxed()); + return + }, + } + + // Let external validator check the block announcement. + let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice()); + let future = self.block_announce_validator.validate(header, assoc_data); + + self.block_announce_validation.push( + async move { + match future.await { + Ok(Validation::Success { is_new_best }) => PreValidateBlockAnnounce::Process { + is_new_best: is_new_best || is_best, + announce, + who, + }, + Ok(Validation::Failure { disconnect }) => { + debug!( + target: "sync", + "Block announcement validation of block {:?} from {} failed", + hash, + who, + ); + PreValidateBlockAnnounce::Failure { who, disconnect } + }, + Err(e) => { + debug!( + target: "sync", + "💔 Block announcement validation of block {:?} errored: {}", + hash, + e, + ); + PreValidateBlockAnnounce::Error { who } + }, + } + } + .boxed(), + ); + } + + fn poll_block_announce_validation( + &mut self, + cx: &mut std::task::Context, + ) -> Poll<PollBlockAnnounceValidation<B::Header>> { + match self.block_announce_validation.poll_next_unpin(cx) { + Poll::Ready(Some(res)) => { + self.peer_block_announce_validation_finished(&res); + Poll::Ready(self.finish_block_announce_validation(res)) + }, + _ => Poll::Pending, + } + } + + fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<B>> { + self.blocks.clear_peer_download(who); + if let Some(gap_sync) = &mut self.gap_sync { + gap_sync.blocks.clear_peer_download(who) + } + self.peers.remove(who); + self.extra_justifications.peer_disconnected(who); + self.allowed_requests.set_all(); + self.fork_targets.retain(|_, target| { + target.peers.remove(who); + !target.peers.is_empty() + }); + let blocks = self.ready_blocks(); + (!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false)) + } + + fn metrics(&self) -> Metrics { + Metrics { + queued_blocks: self.queue_blocks.len().try_into().unwrap_or(std::u32::MAX), + fork_targets: self.fork_targets.len().try_into().unwrap_or(std::u32::MAX), + justifications: self.extra_justifications.metrics(), + } + } + + /// Create implementation-specific block request. + fn create_opaque_block_request(&self, request: &BlockRequest<B>) -> OpaqueBlockRequest { + OpaqueBlockRequest(Box::new(schema::v1::BlockRequest { + fields: request.fields.to_be_u32(), + from_block: match request.from { + FromBlock::Hash(h) => Some(schema::v1::block_request::FromBlock::Hash(h.encode())), + FromBlock::Number(n) => + Some(schema::v1::block_request::FromBlock::Number(n.encode())), + }, + to_block: request.to.map(|h| h.encode()).unwrap_or_default(), + direction: request.direction as i32, + max_blocks: request.max.unwrap_or(0), + support_multiple_justifications: true, + })) + } + + fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result<Vec<u8>, String> { + let request: &schema::v1::BlockRequest = request.0.downcast_ref().ok_or_else(|| { + "Failed to downcast opaque block response during encoding, this is an \ + implementation bug." + .to_string() + })?; + + Ok(request.encode_to_vec()) + } + + fn decode_block_response(&self, response: &[u8]) -> Result<OpaqueBlockResponse, String> { + let response = schema::v1::BlockResponse::decode(response) + .map_err(|error| format!("Failed to decode block response: {error}"))?; + + Ok(OpaqueBlockResponse(Box::new(response))) + } + + fn block_response_into_blocks( + &self, + request: &BlockRequest<B>, + response: OpaqueBlockResponse, + ) -> Result<Vec<BlockData<B>>, String> { + let response: Box<schema::v1::BlockResponse> = response.0.downcast().map_err(|_error| { + "Failed to downcast opaque block response during encoding, this is an \ + implementation bug." + .to_string() + })?; + + response + .blocks + .into_iter() + .map(|block_data| { + Ok(BlockData::<B> { + hash: Decode::decode(&mut block_data.hash.as_ref())?, + header: if !block_data.header.is_empty() { + Some(Decode::decode(&mut block_data.header.as_ref())?) + } else { + None + }, + body: if request.fields.contains(BlockAttributes::BODY) { + Some( + block_data + .body + .iter() + .map(|body| Decode::decode(&mut body.as_ref())) + .collect::<Result<Vec<_>, _>>()?, + ) + } else { + None + }, + indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) { + Some(block_data.indexed_body) + } else { + None + }, + receipt: if !block_data.receipt.is_empty() { + Some(block_data.receipt) + } else { + None + }, + message_queue: if !block_data.message_queue.is_empty() { + Some(block_data.message_queue) + } else { + None + }, + justification: if !block_data.justification.is_empty() { + Some(block_data.justification) + } else if block_data.is_empty_justification { + Some(Vec::new()) + } else { + None + }, + justifications: if !block_data.justifications.is_empty() { + Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?) + } else { + None + }, + }) + }) + .collect::<Result<_, _>>() + .map_err(|error: codec::Error| error.to_string()) + } + + fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String> { + let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| { + "Failed to downcast opaque state response during encoding, this is an \ + implementation bug." + .to_string() + })?; + + Ok(request.encode_to_vec()) + } + + fn decode_state_response(&self, response: &[u8]) -> Result<OpaqueStateResponse, String> { + let response = StateResponse::decode(response) + .map_err(|error| format!("Failed to decode state response: {error}"))?; + + Ok(OpaqueStateResponse(Box::new(response))) + } +} + +impl<B, Client> ChainSync<B, Client> +where + Self: ChainSyncT<B>, + B: BlockT, + Client: HeaderBackend<B> + + BlockBackend<B> + + HeaderMetadata<B, Error = sp_blockchain::Error> + + ProofProvider<B> + + Send + + Sync + + 'static, +{ + /// Create a new instance. + pub fn new( + mode: SyncMode, + client: Arc<Client>, + block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>, + max_parallel_downloads: u32, + warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>, + ) -> Result<Self, ClientError> { + let mut sync = Self { + client, + peers: HashMap::new(), + blocks: BlockCollection::new(), + best_queued_hash: Default::default(), + best_queued_number: Zero::zero(), + extra_justifications: ExtraRequests::new("justification"), + mode, + queue_blocks: Default::default(), + fork_targets: Default::default(), + allowed_requests: Default::default(), + block_announce_validator, + max_parallel_downloads, + downloaded_blocks: 0, + block_announce_validation: Default::default(), + block_announce_validation_per_peer_stats: Default::default(), + state_sync: None, + warp_sync: None, + warp_sync_provider, + import_existing: false, + gap_sync: None, + }; + sync.reset_sync_start_point()?; + Ok(sync) } - /// Call this when a justification has been processed by the import queue, - /// with or without errors. - pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) { - let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; - self.extra_justifications - .try_finalize_root((hash, number), finalization_result, true); - self.allowed_requests.set_all(); + /// Returns the best seen block. + fn best_seen(&self) -> Option<NumberFor<B>> { + let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>(); + + if best_seens.is_empty() { + None + } else { + let middle = best_seens.len() / 2; + + // Not the "perfect median" when we have an even number of peers. + Some(*best_seens.select_nth_unstable(middle).1) + } } - /// Notify about finalization of the given block. - pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) { - let client = &self.client; - let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { - is_descendent_of(&**client, base, block) - }); + fn required_block_attributes(&self) -> BlockAttributes { + match self.mode { + SyncMode::Full => + BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, + SyncMode::Light => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION, + SyncMode::LightState { storage_chain_mode: false, .. } | SyncMode::Warp => + BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, + SyncMode::LightState { storage_chain_mode: true, .. } => + BlockAttributes::HEADER | + BlockAttributes::JUSTIFICATION | + BlockAttributes::INDEXED_BODY, + } + } - if let SyncMode::LightState { skip_proofs, .. } = &self.mode { - if self.state_sync.is_none() && !self.peers.is_empty() && self.queue_blocks.is_empty() { - // Finalized a recent block. - let mut heads: Vec<_> = - self.peers.iter().map(|(_, peer)| peer.best_number).collect(); - heads.sort(); - let median = heads[heads.len() / 2]; - if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { - if let Ok(Some(header)) = self.client.header(BlockId::hash(*hash)) { - log::debug!( - target: "sync", - "Starting state sync for #{} ({})", - number, - hash, - ); - self.state_sync = - Some(StateSync::new(self.client.clone(), header, *skip_proofs)); - self.allowed_requests.set_all(); - } - } - } + fn skip_execution(&self) -> bool { + match self.mode { + SyncMode::Full => false, + SyncMode::Light => true, + SyncMode::LightState { .. } => true, + SyncMode::Warp => true, } + } - if let Err(err) = r { - warn!( + fn validate_and_queue_blocks( + &mut self, + mut new_blocks: Vec<IncomingBlock<B>>, + gap: bool, + ) -> OnBlockData<B> { + let orig_len = new_blocks.len(); + new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash)); + if new_blocks.len() != orig_len { + debug!( target: "sync", - "💔 Error cleaning up pending extra justification data requests: {}", - err, + "Ignoring {} blocks that are already queued", + orig_len - new_blocks.len(), + ); + } + + let origin = if !gap && self.status().state != SyncState::Downloading { + BlockOrigin::NetworkBroadcast + } else { + BlockOrigin::NetworkInitialSync + }; + + if let Some((h, n)) = new_blocks + .last() + .and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number()))) + { + trace!( + target:"sync", + "Accepted {} blocks ({:?}) with origin {:?}", + new_blocks.len(), + h, + origin, ); + self.on_block_queued(h, n) + } + self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash)); + OnBlockData::Import(origin, new_blocks) + } + + fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) { + if let Some(peer) = self.peers.get_mut(peer_id) { + peer.update_common_number(new_common); } } @@ -1779,135 +1874,6 @@ where } } - /// Push a block announce validation. - /// - /// It is required that [`ChainSync::poll_block_announce_validation`] is called - /// to check for finished block announce validations. - pub fn push_block_announce_validation( - &mut self, - who: PeerId, - hash: B::Hash, - announce: BlockAnnounce<B::Header>, - is_best: bool, - ) { - let header = &announce.header; - let number = *header.number(); - debug!( - target: "sync", - "Pre-validating received block announcement {:?} with number {:?} from {}", - hash, - number, - who, - ); - - if number.is_zero() { - self.block_announce_validation.push( - async move { - warn!( - target: "sync", - "💔 Ignored genesis block (#0) announcement from {}: {}", - who, - hash, - ); - PreValidateBlockAnnounce::Skip - } - .boxed(), - ); - return - } - - // Check if there is a slot for this block announce validation. - match self.has_slot_for_block_announce_validation(&who) { - HasSlotForBlockAnnounceValidation::Yes => {}, - HasSlotForBlockAnnounceValidation::TotalMaximumSlotsReached => { - self.block_announce_validation.push( - async move { - warn!( - target: "sync", - "💔 Ignored block (#{} -- {}) announcement from {} because all validation slots are occupied.", - number, - hash, - who, - ); - PreValidateBlockAnnounce::Skip - } - .boxed(), - ); - return - }, - HasSlotForBlockAnnounceValidation::MaximumPeerSlotsReached => { - self.block_announce_validation.push(async move { - warn!( - target: "sync", - "💔 Ignored block (#{} -- {}) announcement from {} because all validation slots for this peer are occupied.", - number, - hash, - who, - ); - PreValidateBlockAnnounce::Skip - }.boxed()); - return - }, - } - - // Let external validator check the block announcement. - let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice()); - let future = self.block_announce_validator.validate(header, assoc_data); - - self.block_announce_validation.push( - async move { - match future.await { - Ok(Validation::Success { is_new_best }) => PreValidateBlockAnnounce::Process { - is_new_best: is_new_best || is_best, - announce, - who, - }, - Ok(Validation::Failure { disconnect }) => { - debug!( - target: "sync", - "Block announcement validation of block {:?} from {} failed", - hash, - who, - ); - PreValidateBlockAnnounce::Failure { who, disconnect } - }, - Err(e) => { - debug!( - target: "sync", - "💔 Block announcement validation of block {:?} errored: {}", - hash, - e, - ); - PreValidateBlockAnnounce::Error { who } - }, - } - } - .boxed(), - ); - } - - /// Poll block announce validation. - /// - /// Block announce validations can be pushed by using - /// [`ChainSync::push_block_announce_validation`]. - /// - /// This should be polled until it returns [`Poll::Pending`]. - /// - /// If [`PollBlockAnnounceValidation::ImportHeader`] is returned, then the caller MUST try to - /// import passed header (call `on_block_data`). The network request isn't sent in this case. - pub fn poll_block_announce_validation( - &mut self, - cx: &mut std::task::Context, - ) -> Poll<PollBlockAnnounceValidation<B::Header>> { - match self.block_announce_validation.poll_next_unpin(cx) { - Poll::Ready(Some(res)) => { - self.peer_block_announce_validation_finished(&res); - Poll::Ready(self.finish_block_announce_validation(res)) - }, - _ => Poll::Pending, - } - } - /// Should be called when a block announce validation is finished, to update the slots /// of the peer that send the block announce. fn peer_block_announce_validation_finished( @@ -2065,29 +2031,6 @@ where PollBlockAnnounceValidation::Nothing { is_best, who, announce } } - /// Call when a peer has disconnected. - /// Canceled obsolete block request may result in some blocks being ready for - /// import, so this functions checks for such blocks and returns them. - pub fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<B>> { - self.blocks.clear_peer_download(who); - if let Some(gap_sync) = &mut self.gap_sync { - gap_sync.blocks.clear_peer_download(who) - } - self.peers.remove(who); - self.extra_justifications.peer_disconnected(who); - self.allowed_requests.set_all(); - self.fork_targets.retain(|_, target| { - target.peers.remove(who); - !target.peers.is_empty() - }); - let blocks = self.ready_blocks(); - if !blocks.is_empty() { - Some(self.validate_and_queue_blocks(blocks, false)) - } else { - None - } - } - /// Restart the sync process. This will reset all pending block requests and return an iterator /// of new block requests to make to peers. Peers that were downloading finality data (i.e. /// their state was `DownloadingJustification`) are unaffected and will stay in the same state. @@ -2190,16 +2133,6 @@ where .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) } - /// Return some key metrics. - pub fn metrics(&self) -> Metrics { - Metrics { - queued_blocks: self.queue_blocks.len().try_into().unwrap_or(std::u32::MAX), - fork_targets: self.fork_targets.len().try_into().unwrap_or(std::u32::MAX), - justifications: self.extra_justifications.metrics(), - _priv: (), - } - } - /// Get the set of downloaded blocks that are ready to be queued for import. fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> { self.blocks @@ -2237,23 +2170,15 @@ fn legacy_justification_mapping( justification.map(|just| (*b"FRNK", just).into()) } -#[derive(Debug)] -pub struct Metrics { - pub queued_blocks: u32, - pub fork_targets: u32, - pub justifications: extra_requests::Metrics, - _priv: (), -} - /// Request the ancestry for a block. Sends a request for header and justification for the given /// block number. Used during ancestry search. fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> { - message::generic::BlockRequest { + BlockRequest::<B> { id: 0, fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION, - from: message::FromBlock::Number(block), + from: FromBlock::Number(block), to: None, - direction: message::Direction::Ascending, + direction: Direction::Ascending, max: Some(1), } } @@ -2331,7 +2256,7 @@ fn peer_block_request<B: BlockT>( id: &PeerId, peer: &PeerSync<B>, blocks: &mut BlockCollection<B>, - attrs: message::BlockAttributes, + attrs: BlockAttributes, max_parallel_downloads: u32, finalized: NumberFor<B>, best_num: NumberFor<B>, @@ -2359,17 +2284,17 @@ fn peer_block_request<B: BlockT>( let last = range.end.saturating_sub(One::one()); let from = if peer.best_number == last { - message::FromBlock::Hash(peer.best_hash) + FromBlock::Hash(peer.best_hash) } else { - message::FromBlock::Number(last) + FromBlock::Number(last) }; - let request = message::generic::BlockRequest { + let request = BlockRequest::<B> { id: 0, fields: attrs, from, to: None, - direction: message::Direction::Descending, + direction: Direction::Descending, max: Some((range.end - range.start).saturated_into::<u32>()), }; @@ -2381,7 +2306,7 @@ fn peer_gap_block_request<B: BlockT>( id: &PeerId, peer: &PeerSync<B>, blocks: &mut BlockCollection<B>, - attrs: message::BlockAttributes, + attrs: BlockAttributes, target: NumberFor<B>, common_number: NumberFor<B>, ) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> { @@ -2396,14 +2321,14 @@ fn peer_gap_block_request<B: BlockT>( // The end is not part of the range. let last = range.end.saturating_sub(One::one()); - let from = message::FromBlock::Number(last); + let from = FromBlock::Number(last); - let request = message::generic::BlockRequest { + let request = BlockRequest::<B> { id: 0, fields: attrs, from, to: None, - direction: message::Direction::Descending, + direction: Direction::Descending, max: Some((range.end - range.start).saturated_into::<u32>()), }; Some((range, request)) @@ -2415,7 +2340,7 @@ fn fork_sync_request<B: BlockT>( targets: &mut HashMap<B::Hash, ForkTarget<B>>, best_num: NumberFor<B>, finalized: NumberFor<B>, - attributes: message::BlockAttributes, + attributes: BlockAttributes, check_block: impl Fn(&B::Hash) -> BlockStatus, ) -> Option<(B::Hash, BlockRequest<B>)> { targets.retain(|hash, r| { @@ -2448,12 +2373,12 @@ fn fork_sync_request<B: BlockT>( trace!(target: "sync", "Downloading requested fork {:?} from {}, {} blocks", hash, id, count); return Some(( *hash, - message::generic::BlockRequest { + BlockRequest::<B> { id: 0, fields: attributes, - from: message::FromBlock::Hash(*hash), + from: FromBlock::Hash(*hash), to: None, - direction: message::Direction::Descending, + direction: Direction::Descending, max: Some(count), }, )) @@ -2488,7 +2413,7 @@ where /// /// It is expected that `blocks` are in ascending order. fn validate_blocks<Block: BlockT>( - blocks: &Vec<message::BlockData<Block>>, + blocks: &Vec<BlockData<Block>>, who: &PeerId, request: Option<BlockRequest<Block>>, ) -> Result<Option<NumberFor<Block>>, BadPeer> { @@ -2505,16 +2430,13 @@ fn validate_blocks<Block: BlockT>( return Err(BadPeer(*who, rep::NOT_REQUESTED)) } - let block_header = if request.direction == message::Direction::Descending { - blocks.last() - } else { - blocks.first() - } - .and_then(|b| b.header.as_ref()); + let block_header = + if request.direction == Direction::Descending { blocks.last() } else { blocks.first() } + .and_then(|b| b.header.as_ref()); let expected_block = block_header.as_ref().map_or(false, |h| match request.from { - message::FromBlock::Hash(hash) => h.hash() == hash, - message::FromBlock::Number(n) => h.number() == &n, + FromBlock::Hash(hash) => h.hash() == hash, + FromBlock::Number(n) => h.number() == &n, }); if !expected_block { @@ -2528,7 +2450,7 @@ fn validate_blocks<Block: BlockT>( return Err(BadPeer(*who, rep::NOT_REQUESTED)) } - if request.fields.contains(message::BlockAttributes::HEADER) && + if request.fields.contains(BlockAttributes::HEADER) && blocks.iter().any(|b| b.header.is_none()) { trace!( @@ -2540,8 +2462,7 @@ fn validate_blocks<Block: BlockT>( return Err(BadPeer(*who, rep::BAD_RESPONSE)) } - if request.fields.contains(message::BlockAttributes::BODY) && - blocks.iter().any(|b| b.body.is_none()) + if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none()) { trace!( target: "sync", @@ -2592,13 +2513,10 @@ fn validate_blocks<Block: BlockT>( #[cfg(test)] mod test { - use super::{ - message::{BlockState, FromBlock}, - *, - }; - use crate::message::BlockData; + use super::*; use futures::{executor::block_on, future::poll_fn}; use sc_block_builder::BlockBuilderProvider; + use sc_network_common::sync::message::{BlockData, BlockState, FromBlock}; use sp_blockchain::HeaderBackend; use sp_consensus::block_validation::DefaultBlockAnnounceValidator; use substrate_test_runtime_client::{ @@ -2709,7 +2627,7 @@ mod test { assert!(sync.justification_requests().any(|(p, r)| { p == peer_id3 && r.fields == BlockAttributes::JUSTIFICATION && - r.from == message::FromBlock::Hash(b1_hash) && + r.from == FromBlock::Hash(b1_hash) && r.to == None })); @@ -3125,10 +3043,11 @@ mod test { let response = create_block_response(vec![block.clone()]); let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); - request = match on_block_data.into_request() { - Some(req) => req.1, + request = if let OnBlockData::Request(_peer, request) = on_block_data { + request + } else { // We found the ancenstor - None => break, + break }; log::trace!(target: "sync", "Request: {:?}", request); @@ -3255,10 +3174,11 @@ mod test { let response = create_block_response(vec![block.clone()]); let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); - request = match on_block_data.into_request() { - Some(req) => req.1, + request = if let OnBlockData::Request(_peer, request) = on_block_data { + request + } else { // We found the ancenstor - None => break, + break }; log::trace!(target: "sync", "Request: {:?}", request); diff --git a/substrate/client/network/sync/src/schema.rs b/substrate/client/network/sync/src/schema.rs index aa3eb84621d..b31005360d0 100644 --- a/substrate/client/network/sync/src/schema.rs +++ b/substrate/client/network/sync/src/schema.rs @@ -18,6 +18,6 @@ //! Include sources generated from protobuf definitions. -pub mod v1 { +pub(crate) mod v1 { include!(concat!(env!("OUT_DIR"), "/api.v1.rs")); } diff --git a/substrate/client/network/sync/src/state.rs b/substrate/client/network/sync/src/state.rs index 4041c28af0e..e70d3b6b33a 100644 --- a/substrate/client/network/sync/src/state.rs +++ b/substrate/client/network/sync/src/state.rs @@ -23,6 +23,7 @@ use codec::{Decode, Encode}; use log::debug; use sc_client_api::{CompactProof, ProofProvider}; use sc_consensus::ImportedState; +use sc_network_common::sync::StateDownloadProgress; use smallvec::SmallVec; use sp_core::storage::well_known_keys; use sp_runtime::traits::{Block as BlockT, Header, NumberFor}; @@ -42,15 +43,6 @@ pub struct StateSync<B: BlockT, Client> { skip_proof: bool, } -/// Reported state download progress. -#[derive(Clone, Eq, PartialEq, Debug)] -pub struct StateDownloadProgress { - /// Estimated download percentage. - pub percentage: u32, - /// Total state size in bytes downloaded so far. - pub size: u64, -} - /// Import state chunk result. pub enum ImportResult<B: BlockT> { /// State is complete and ready for import. diff --git a/substrate/client/network/sync/src/warp.rs b/substrate/client/network/sync/src/warp.rs index d3d9d7d2441..f3fad6c1b7f 100644 --- a/substrate/client/network/sync/src/warp.rs +++ b/substrate/client/network/sync/src/warp.rs @@ -18,60 +18,25 @@ //! Warp sync support. -pub use crate::warp_request_handler::{ - EncodedProof, Request as WarpProofRequest, VerificationResult, WarpSyncProvider, -}; use crate::{ schema::v1::{StateRequest, StateResponse}, state::{ImportResult, StateSync}, }; use sc_client_api::ProofProvider; +use sc_network_common::sync::warp::{ + EncodedProof, VerificationResult, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, + WarpSyncProvider, +}; use sp_blockchain::HeaderBackend; use sp_finality_grandpa::{AuthorityList, SetId}; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; -use std::{fmt, sync::Arc}; +use std::sync::Arc; enum Phase<B: BlockT, Client> { WarpProof { set_id: SetId, authorities: AuthorityList, last_hash: B::Hash }, State(StateSync<B, Client>), } -/// Reported warp sync phase. -#[derive(Clone, Eq, PartialEq, Debug)] -pub enum WarpSyncPhase<B: BlockT> { - /// Waiting for peers to connect. - AwaitingPeers, - /// Downloading and verifying grandpa warp proofs. - DownloadingWarpProofs, - /// Downloading state data. - DownloadingState, - /// Importing state. - ImportingState, - /// Downloading block history. - DownloadingBlocks(NumberFor<B>), -} - -impl<B: BlockT> fmt::Display for WarpSyncPhase<B> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::AwaitingPeers => write!(f, "Waiting for peers"), - Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"), - Self::DownloadingState => write!(f, "Downloading state"), - Self::ImportingState => write!(f, "Importing state"), - Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n), - } - } -} - -/// Reported warp sync progress. -#[derive(Clone, Eq, PartialEq, Debug)] -pub struct WarpSyncProgress<B: BlockT> { - /// Estimated download percentage. - pub phase: WarpSyncPhase<B>, - /// Total bytes downloaded so far. - pub total_bytes: u64, -} - /// Import warp proof result. pub enum WarpProofImportResult { /// Import was successful. diff --git a/substrate/client/network/sync/src/warp_request_handler.rs b/substrate/client/network/sync/src/warp_request_handler.rs index 4f66e0a6daf..53ec216a1e6 100644 --- a/substrate/client/network/sync/src/warp_request_handler.rs +++ b/substrate/client/network/sync/src/warp_request_handler.rs @@ -1,4 +1,4 @@ -// Copyright 2021 Parity Technologies (UK) Ltd. +// Copyright 2022 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify @@ -16,7 +16,7 @@ //! Helper for handling (i.e. answering) grandpa warp sync requests from a remote peer. -use codec::{Decode, Encode}; +use codec::Decode; use futures::{ channel::{mpsc, oneshot}, stream::StreamExt, @@ -27,52 +27,13 @@ use sc_network_common::{ request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, + sync::warp::{EncodedProof, WarpProofRequest, WarpSyncProvider}, }; use sp_runtime::traits::Block as BlockT; use std::{sync::Arc, time::Duration}; -pub use sp_finality_grandpa::{AuthorityList, SetId}; - -/// Scale-encoded warp sync proof response. -pub struct EncodedProof(pub Vec<u8>); - -/// Warp sync request -#[derive(Encode, Decode, Debug)] -pub struct Request<B: BlockT> { - /// Start collecting proofs from this block. - pub begin: B::Hash, -} - const MAX_RESPONSE_SIZE: u64 = 16 * 1024 * 1024; -/// Proof verification result. -pub enum VerificationResult<Block: BlockT> { - /// Proof is valid, but the target was not reached. - Partial(SetId, AuthorityList, Block::Hash), - /// Target finality is proved. - Complete(SetId, AuthorityList, Block::Header), -} - -/// Warp sync backend. Handles retrieveing and verifying warp sync proofs. -pub trait WarpSyncProvider<B: BlockT>: Send + Sync { - /// Generate proof starting at given block hash. The proof is accumulated until maximum proof - /// size is reached. - fn generate( - &self, - start: B::Hash, - ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>; - /// Verify warp proof against current set of authorities. - fn verify( - &self, - proof: &EncodedProof, - set_id: SetId, - authorities: AuthorityList, - ) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>; - /// Get current list of authorities. This is supposed to be genesis authorities when starting - /// sync. - fn current_authorities(&self) -> AuthorityList; -} - /// Generates a [`RequestResponseConfig`] for the grandpa warp sync request protocol, refusing /// incoming requests. pub fn generate_request_response_config(protocol_id: ProtocolId) -> RequestResponseConfig { @@ -115,7 +76,7 @@ impl<TBlock: BlockT> RequestHandler<TBlock> { payload: Vec<u8>, pending_response: oneshot::Sender<OutgoingResponse>, ) -> Result<(), HandleRequestError> { - let request = Request::<TBlock>::decode(&mut &payload[..])?; + let request = WarpProofRequest::<TBlock>::decode(&mut &payload[..])?; let EncodedProof(proof) = self .backend diff --git a/substrate/client/network/test/Cargo.toml b/substrate/client/network/test/Cargo.toml index fdb9befc41c..1aa6ebd8bf3 100644 --- a/substrate/client/network/test/Cargo.toml +++ b/substrate/client/network/test/Cargo.toml @@ -26,6 +26,8 @@ sc-client-api = { version = "4.0.0-dev", path = "../../api" } sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-network = { version = "0.10.0-dev", path = "../" } sc-network-common = { version = "0.10.0-dev", path = "../common" } +sc-network-light = { version = "0.10.0-dev", path = "../light" } +sc-network-sync = { version = "0.10.0-dev", path = "../sync" } sc-service = { version = "0.10.0-dev", default-features = false, features = ["test-helpers"], path = "../../service" } sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 9e752e81a3b..d7c83810d52 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -48,16 +48,21 @@ use sc_consensus::{ }; pub use sc_network::config::EmptyTransactionPool; use sc_network::{ - block_request_handler::BlockRequestHandler, config::{ - MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, - ProtocolConfig, Role, SyncMode, TransportConfig, + MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, Role, + SyncMode, TransportConfig, }, - light_client_requests::handler::LightClientRequestHandler, - state_request_handler::StateRequestHandler, - warp_request_handler, Multiaddr, NetworkService, NetworkWorker, + Multiaddr, NetworkService, NetworkWorker, }; pub use sc_network_common::config::ProtocolId; +use sc_network_common::sync::warp::{ + AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncProvider, +}; +use sc_network_light::light_client_requests::handler::LightClientRequestHandler; +use sc_network_sync::{ + block_request_handler::BlockRequestHandler, state_request_handler::StateRequestHandler, + warp_request_handler, ChainSync, +}; use sc_service::client::Client; use sp_blockchain::{ well_known_cache_keys::{self, Id as CacheKeyId}, @@ -638,27 +643,26 @@ impl<B: BlockT> VerifierAdapter<B> { struct TestWarpSyncProvider<B: BlockT>(Arc<dyn HeaderBackend<B>>); -impl<B: BlockT> warp_request_handler::WarpSyncProvider<B> for TestWarpSyncProvider<B> { +impl<B: BlockT> WarpSyncProvider<B> for TestWarpSyncProvider<B> { fn generate( &self, _start: B::Hash, - ) -> Result<warp_request_handler::EncodedProof, Box<dyn std::error::Error + Send + Sync>> { + ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>> { let info = self.0.info(); let best_header = self.0.header(BlockId::hash(info.best_hash)).unwrap().unwrap(); - Ok(warp_request_handler::EncodedProof(best_header.encode())) + Ok(EncodedProof(best_header.encode())) } fn verify( &self, - proof: &warp_request_handler::EncodedProof, - _set_id: warp_request_handler::SetId, - _authorities: warp_request_handler::AuthorityList, - ) -> Result<warp_request_handler::VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>> - { - let warp_request_handler::EncodedProof(encoded) = proof; + proof: &EncodedProof, + _set_id: SetId, + _authorities: AuthorityList, + ) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>> { + let EncodedProof(encoded) = proof; let header = B::Header::decode(&mut encoded.as_slice()).unwrap(); - Ok(warp_request_handler::VerificationResult::Complete(0, Default::default(), header)) + Ok(VerificationResult::Complete(0, Default::default(), header)) } - fn current_authorities(&self) -> warp_request_handler::AuthorityList { + fn current_authorities(&self) -> AuthorityList { Default::default() } } @@ -688,7 +692,7 @@ pub struct FullPeerConfig { pub storage_chain: bool, } -pub trait TestNetFactory: Sized +pub trait TestNetFactory: Default + Sized where <Self::BlockImport as BlockImport<Block>>::Transaction: Send, { @@ -696,14 +700,8 @@ where type BlockImport: BlockImport<Block, Error = ConsensusError> + Clone + Send + Sync + 'static; type PeerData: Default; - /// These two need to be implemented! - fn from_config(config: &ProtocolConfig) -> Self; - fn make_verifier( - &self, - client: PeersClient, - config: &ProtocolConfig, - peer_data: &Self::PeerData, - ) -> Self::Verifier; + /// This one needs to be implemented! + fn make_verifier(&self, client: PeersClient, peer_data: &Self::PeerData) -> Self::Verifier; /// Get reference to peer. fn peer(&mut self, i: usize) -> &mut Peer<Self::PeerData, Self::BlockImport>; @@ -723,15 +721,10 @@ where Self::PeerData, ); - fn default_config() -> ProtocolConfig { - ProtocolConfig::default() - } - /// Create new test network with this many peers. fn new(n: usize) -> Self { trace!(target: "test_network", "Creating test network"); - let config = Self::default_config(); - let mut net = Self::from_config(&config); + let mut net = Self::default(); for i in 0..n { trace!(target: "test_network", "Adding peer {}", i); @@ -767,11 +760,8 @@ where let (block_import, justification_import, data) = self .make_block_import(PeersClient { client: client.clone(), backend: backend.clone() }); - let verifier = self.make_verifier( - PeersClient { client: client.clone(), backend: backend.clone() }, - &Default::default(), - &data, - ); + let verifier = self + .make_verifier(PeersClient { client: client.clone(), backend: backend.clone() }, &data); let verifier = VerifierAdapter::new(verifier); let import_queue = Box::new(BasicQueue::new( @@ -845,6 +835,10 @@ where protocol_config }; + let max_parallel_downloads = network_config.max_parallel_downloads; + let block_announce_validator = config + .block_announce_validator + .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)); let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: None, @@ -856,9 +850,18 @@ where transaction_pool: Arc::new(EmptyTransactionPool), protocol_id, import_queue, - block_announce_validator: config - .block_announce_validator - .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)), + create_chain_sync: Box::new(move |sync_mode, chain, warp_sync_provider| { + match ChainSync::new( + sync_mode, + chain, + block_announce_validator, + max_parallel_downloads, + warp_sync_provider, + ) { + Ok(chain_sync) => Ok(Box::new(chain_sync)), + Err(error) => Err(Box::new(error).into()), + } + }), metrics_registry: None, block_request_protocol_config, state_request_protocol_config, @@ -1012,6 +1015,7 @@ where } } +#[derive(Default)] pub struct TestNet { peers: Vec<Peer<(), PeersClient>>, } @@ -1021,17 +1025,7 @@ impl TestNetFactory for TestNet { type PeerData = (); type BlockImport = PeersClient; - /// Create new test network with peers and given config. - fn from_config(_config: &ProtocolConfig) -> Self { - TestNet { peers: Vec::new() } - } - - fn make_verifier( - &self, - _client: PeersClient, - _config: &ProtocolConfig, - _peer_data: &(), - ) -> Self::Verifier { + fn make_verifier(&self, _client: PeersClient, _peer_data: &()) -> Self::Verifier { PassThroughVerifier::new(false) } @@ -1081,6 +1075,7 @@ impl JustificationImport<Block> for ForceFinalized { } } +#[derive(Default)] pub struct JustificationTestNet(TestNet); impl TestNetFactory for JustificationTestNet { @@ -1088,17 +1083,8 @@ impl TestNetFactory for JustificationTestNet { type PeerData = (); type BlockImport = PeersClient; - fn from_config(config: &ProtocolConfig) -> Self { - JustificationTestNet(TestNet::from_config(config)) - } - - fn make_verifier( - &self, - client: PeersClient, - config: &ProtocolConfig, - peer_data: &(), - ) -> Self::Verifier { - self.0.make_verifier(client, config, peer_data) + fn make_verifier(&self, client: PeersClient, peer_data: &()) -> Self::Verifier { + self.0.make_verifier(client, peer_data) } fn peer(&mut self, i: usize) -> &mut Peer<Self::PeerData, Self::BlockImport> { diff --git a/substrate/client/service/Cargo.toml b/substrate/client/service/Cargo.toml index ff11dd1344d..31b0c860cf1 100644 --- a/substrate/client/service/Cargo.toml +++ b/substrate/client/service/Cargo.toml @@ -50,8 +50,10 @@ sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/comm sc-consensus = { version = "0.10.0-dev", path = "../../client/consensus/common" } sp-inherents = { version = "4.0.0-dev", path = "../../primitives/inherents" } sp-storage = { version = "6.0.0", path = "../../primitives/storage" } -sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-network = { version = "0.10.0-dev", path = "../network" } +sc-network-common = { version = "0.10.0-dev", path = "../network/common" } +sc-network-light = { version = "0.10.0-dev", path = "../network/light" } +sc-network-sync = { version = "0.10.0-dev", path = "../network/sync" } sc-chain-spec = { version = "4.0.0-dev", path = "../chain-spec" } sc-client-api = { version = "4.0.0-dev", path = "../api" } sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 5319bf24d5e..fe0ae5db53e 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -38,13 +38,17 @@ use sc_consensus::import_queue::ImportQueue; use sc_executor::RuntimeVersionOf; use sc_keystore::LocalKeystore; use sc_network::{ - block_request_handler::{self, BlockRequestHandler}, config::{Role, SyncMode}, - light_client_requests::{self, handler::LightClientRequestHandler}, - state_request_handler::{self, StateRequestHandler}, - warp_request_handler::{self, RequestHandler as WarpSyncRequestHandler, WarpSyncProvider}, NetworkService, }; +use sc_network_common::sync::warp::WarpSyncProvider; +use sc_network_light::light_client_requests::{self, handler::LightClientRequestHandler}; +use sc_network_sync::{ + block_request_handler::{self, BlockRequestHandler}, + state_request_handler::{self, StateRequestHandler}, + warp_request_handler::{self, RequestHandler as WarpSyncRequestHandler}, + ChainSync, +}; use sc_rpc::{ author::AuthorApiServer, chain::ChainApiServer, @@ -801,6 +805,7 @@ where } }; + let max_parallel_downloads = config.network.max_parallel_downloads; let network_params = sc_network::config::Params { role: config.role.clone(), executor: { @@ -818,9 +823,20 @@ where network_config: config.network.clone(), chain: client.clone(), transaction_pool: transaction_pool_adapter as _, - import_queue: Box::new(import_queue), protocol_id, - block_announce_validator, + import_queue: Box::new(import_queue), + create_chain_sync: Box::new( + move |sync_mode, chain, warp_sync_provider| match ChainSync::new( + sync_mode, + chain, + block_announce_validator, + max_parallel_downloads, + warp_sync_provider, + ) { + Ok(chain_sync) => Ok(Box::new(chain_sync)), + Err(error) => Err(Box::new(error).into()), + }, + ), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_request_protocol_config, state_request_protocol_config, -- GitLab