From b35b28ca4beeb7318006e7bdb7927c9309e7ad7a Mon Sep 17 00:00:00 2001 From: Rahul Subramaniyam <78006270+rahulksnv@users.noreply.github.com> Date: Fri, 15 Sep 2023 01:12:45 -0700 Subject: [PATCH] Modular block request handler (#1524) Submit the outstanding PRs from the old repos(these were already reviewed and approved before the repo rorg, but not yet submitted): Main PR: https://github.com/paritytech/substrate/pull/14014 Companion PRs: https://github.com/paritytech/polkadot/pull/7134, https://github.com/paritytech/cumulus/pull/2489 The changes in the PR: 1. ChainSync currently calls into the block request handler directly. Instead, move the block request handler behind a trait. This allows new protocols to be plugged into ChainSync. 2. BuildNetworkParams is changed so that custom relay protocol implementations can be (optionally) passed in during network creation time. If custom protocol is not specified, it defaults to the existing block handler 3. BlockServer and BlockDownloader traits are introduced for the protocol implementation. The existing block handler has been changed to implement these traits 4. Other changes: [X] Make TxHash serializable. This is needed for exchanging the serialized hash in the relay protocol messages [X] Clean up types no longer used(OpaqueBlockRequest, OpaqueBlockResponse) --------- Co-authored-by: Dmitry Markin <dmitry@markin.tech> Co-authored-by: command-bot <> --- cumulus/client/service/src/lib.rs | 1 + polkadot/node/service/src/lib.rs | 1 + .../bin/node-template/node/src/service.rs | 1 + substrate/bin/node/cli/src/service.rs | 1 + substrate/client/network/common/src/sync.rs | 33 +-- .../network/sync/src/block_relay_protocol.rs | 72 ++++++ .../network/sync/src/block_request_handler.rs | 168 ++++++++++++-- substrate/client/network/sync/src/engine.rs | 5 +- substrate/client/network/sync/src/lib.rs | 212 +++++------------- substrate/client/network/sync/src/mock.rs | 35 ++- substrate/client/network/test/src/lib.rs | 24 +- substrate/client/network/test/src/service.rs | 24 +- substrate/client/service/src/builder.rs | 46 ++-- .../client/transaction-pool/api/src/lib.rs | 3 +- 14 files changed, 370 insertions(+), 256 deletions(-) create mode 100644 substrate/client/network/sync/src/block_relay_protocol.rs diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index 211a5cc3b79..82890666eba 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -484,6 +484,7 @@ where import_queue, block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)), warp_sync_params, + block_relay: None, }) } diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 7f4eadaba7f..75e853d0ef8 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -894,6 +894,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>( import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), + block_relay: None, })?; if config.offchain_worker.enabled { diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index 7303f5cd6dd..40320282924 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -183,6 +183,7 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> { import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), + block_relay: None, })?; if config.offchain_worker.enabled { diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index e49c60fe2fb..977c90e73e9 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -388,6 +388,7 @@ pub fn new_full_base( import_queue, block_announce_validator_builder: None, warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), + block_relay: None, })?; let role = config.role.clone(); diff --git a/substrate/client/network/common/src/sync.rs b/substrate/client/network/common/src/sync.rs index 461c4ae411d..5a6f90b290d 100644 --- a/substrate/client/network/common/src/sync.rs +++ b/substrate/client/network/common/src/sync.rs @@ -22,12 +22,12 @@ pub mod message; pub mod metrics; pub mod warp; -use crate::{role::Roles, sync::message::BlockAnnounce, types::ReputationChange}; +use crate::{role::Roles, types::ReputationChange}; use futures::Stream; use libp2p_identity::PeerId; -use message::{BlockData, BlockRequest, BlockResponse}; +use message::{BlockAnnounce, BlockRequest, BlockResponse}; use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock}; use sp_consensus::BlockOrigin; use sp_runtime::{ @@ -226,28 +226,6 @@ impl fmt::Debug for OpaqueStateResponse { } } -/// Wrapper for implementation-specific block request. -/// -/// NOTE: Implementation must be able to encode and decode it for network purposes. -pub struct OpaqueBlockRequest(pub Box<dyn Any + Send>); - -impl fmt::Debug for OpaqueBlockRequest { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("OpaqueBlockRequest").finish() - } -} - -/// Wrapper for implementation-specific block response. -/// -/// NOTE: Implementation must be able to encode and decode it for network purposes. -pub struct OpaqueBlockResponse(pub Box<dyn Any + Send>); - -impl fmt::Debug for OpaqueBlockResponse { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("OpaqueBlockResponse").finish() - } -} - /// Provides high-level status of syncing. #[async_trait::async_trait] pub trait SyncStatusProvider<Block: BlockT>: Send + Sync { @@ -392,13 +370,6 @@ pub trait ChainSync<Block: BlockT>: Send { /// Return some key metrics. fn metrics(&self) -> Metrics; - /// Access blocks from implementation-specific block response. - fn block_response_into_blocks( - &self, - request: &BlockRequest<Block>, - response: OpaqueBlockResponse, - ) -> Result<Vec<BlockData<Block>>, String>; - /// Advance the state of `ChainSync` fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()>; diff --git a/substrate/client/network/sync/src/block_relay_protocol.rs b/substrate/client/network/sync/src/block_relay_protocol.rs new file mode 100644 index 00000000000..7a313458bf0 --- /dev/null +++ b/substrate/client/network/sync/src/block_relay_protocol.rs @@ -0,0 +1,72 @@ +// Copyright Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +//! Block relay protocol related definitions. + +use futures::channel::oneshot; +use libp2p::PeerId; +use sc_network::request_responses::{ProtocolConfig, RequestFailure}; +use sc_network_common::sync::message::{BlockData, BlockRequest}; +use sp_runtime::traits::Block as BlockT; +use std::sync::Arc; + +/// The serving side of the block relay protocol. It runs a single instance +/// of the server task that processes the incoming protocol messages. +#[async_trait::async_trait] +pub trait BlockServer<Block: BlockT>: Send { + /// Starts the protocol processing. + async fn run(&mut self); +} + +/// The client side stub to download blocks from peers. This is a handle +/// that can be used to initiate concurrent downloads. +#[async_trait::async_trait] +pub trait BlockDownloader<Block: BlockT>: Send + Sync { + /// Performs the protocol specific sequence to fetch the blocks from the peer. + /// Output: if the download succeeds, the response is a `Vec<u8>` which is + /// in a format specific to the protocol implementation. The block data + /// can be extracted from this response using [`BlockDownloader::block_response_into_blocks`]. + async fn download_blocks( + &self, + who: PeerId, + request: BlockRequest<Block>, + ) -> Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled>; + + /// Parses the protocol specific response to retrieve the block data. + fn block_response_into_blocks( + &self, + request: &BlockRequest<Block>, + response: Vec<u8>, + ) -> Result<Vec<BlockData<Block>>, BlockResponseError>; +} + +/// Errors returned by [`BlockDownloader::block_response_into_blocks`]. +#[derive(Debug)] +pub enum BlockResponseError { + /// Failed to decode the response bytes. + DecodeFailed(String), + + /// Failed to extract the blocks from the decoded bytes. + ExtractionFailed(String), +} + +/// Block relay specific params for network creation, specified in +/// ['sc_service::BuildNetworkParams']. +pub struct BlockRelayParams<Block: BlockT> { + pub server: Box<dyn BlockServer<Block>>, + pub downloader: Arc<dyn BlockDownloader<Block>>, + pub request_response_config: ProtocolConfig, +} diff --git a/substrate/client/network/sync/src/block_request_handler.rs b/substrate/client/network/sync/src/block_request_handler.rs index d90a00b3767..c24083f6328 100644 --- a/substrate/client/network/sync/src/block_request_handler.rs +++ b/substrate/client/network/sync/src/block_request_handler.rs @@ -18,29 +18,35 @@ //! `crate::request_responses::RequestResponsesBehaviour`. use crate::{ - schema::v1::{block_request::FromBlock, BlockResponse, Direction}, + block_relay_protocol::{BlockDownloader, BlockRelayParams, BlockResponseError, BlockServer}, + schema::v1::{ + block_request::FromBlock as FromBlockSchema, BlockRequest as BlockRequestSchema, + BlockResponse as BlockResponseSchema, BlockResponse, Direction, + }, + service::network::NetworkServiceHandle, MAX_BLOCKS_IN_RESPONSE, }; -use codec::{Decode, Encode}; +use codec::{Decode, DecodeAll, Encode}; use futures::{channel::oneshot, stream::StreamExt}; use libp2p::PeerId; use log::debug; use prost::Message; -use schnellru::{ByLength, LruMap}; - use sc_client_api::BlockBackend; use sc_network::{ config::ProtocolId, - request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, + request_responses::{ + IfDisconnected, IncomingRequest, OutgoingResponse, ProtocolConfig, RequestFailure, + }, + types::ProtocolName, }; -use sc_network_common::sync::message::BlockAttributes; +use sc_network_common::sync::message::{BlockAttributes, BlockData, BlockRequest, FromBlock}; +use schnellru::{ByLength, LruMap}; use sp_blockchain::HeaderBackend; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header, One, Zero}, }; - use std::{ cmp::min, hash::{Hash, Hasher}, @@ -129,7 +135,8 @@ enum SeenRequestsValue { Fulfilled(usize), } -/// Handler for incoming block requests from a remote peer. +/// The full block server implementation of [`BlockServer`]. It handles +/// the incoming block requests from a remote peer. pub struct BlockRequestHandler<B: BlockT, Client> { client: Arc<Client>, request_receiver: async_channel::Receiver<IncomingRequest>, @@ -146,11 +153,12 @@ where { /// Create a new [`BlockRequestHandler`]. pub fn new( + network: NetworkServiceHandle, protocol_id: &ProtocolId, fork_id: Option<&str>, client: Arc<Client>, num_peer_hint: usize, - ) -> (Self, ProtocolConfig) { + ) -> BlockRelayParams<B> { // Reserve enough request slots for one request per peer when we are at the maximum // number of peers. let capacity = std::cmp::max(num_peer_hint, 1); @@ -170,11 +178,15 @@ where let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2); let seen_requests = LruMap::new(capacity); - (Self { client, request_receiver, seen_requests }, protocol_config) + BlockRelayParams { + server: Box::new(Self { client, request_receiver, seen_requests }), + downloader: Arc::new(FullBlockDownloader::new(protocol_config.name.clone(), network)), + request_response_config: protocol_config, + } } /// Run [`BlockRequestHandler`]. - pub async fn run(mut self) { + async fn process_requests(&mut self) { while let Some(request) = self.request_receiver.next().await { let IncomingRequest { peer, payload, pending_response } = request; @@ -197,11 +209,11 @@ where let request = crate::schema::v1::BlockRequest::decode(&payload[..])?; let from_block_id = match request.from_block.ok_or(HandleRequestError::MissingFromField)? { - FromBlock::Hash(ref h) => { + FromBlockSchema::Hash(ref h) => { let h = Decode::decode(&mut h.as_ref())?; BlockId::<B>::Hash(h) }, - FromBlock::Number(ref n) => { + FromBlockSchema::Number(ref n) => { let n = Decode::decode(&mut n.as_ref())?; BlockId::<B>::Number(n) }, @@ -448,6 +460,17 @@ where } } +#[async_trait::async_trait] +impl<B, Client> BlockServer<B> for BlockRequestHandler<B, Client> +where + B: BlockT, + Client: HeaderBackend<B> + BlockBackend<B> + Send + Sync + 'static, +{ + async fn run(&mut self) { + self.process_requests().await; + } +} + #[derive(Debug, thiserror::Error)] enum HandleRequestError { #[error("Failed to decode request: {0}.")] @@ -465,3 +488,122 @@ enum HandleRequestError { #[error("Failed to send response.")] SendResponse, } + +/// The full block downloader implementation of [`BlockDownloader]. +pub struct FullBlockDownloader { + protocol_name: ProtocolName, + network: NetworkServiceHandle, +} + +impl FullBlockDownloader { + fn new(protocol_name: ProtocolName, network: NetworkServiceHandle) -> Self { + Self { protocol_name, network } + } + + /// Extracts the blocks from the response schema. + fn blocks_from_schema<B: BlockT>( + &self, + request: &BlockRequest<B>, + response: BlockResponseSchema, + ) -> Result<Vec<BlockData<B>>, String> { + response + .blocks + .into_iter() + .map(|block_data| { + Ok(BlockData::<B> { + hash: Decode::decode(&mut block_data.hash.as_ref())?, + header: if !block_data.header.is_empty() { + Some(Decode::decode(&mut block_data.header.as_ref())?) + } else { + None + }, + body: if request.fields.contains(BlockAttributes::BODY) { + Some( + block_data + .body + .iter() + .map(|body| Decode::decode(&mut body.as_ref())) + .collect::<Result<Vec<_>, _>>()?, + ) + } else { + None + }, + indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) { + Some(block_data.indexed_body) + } else { + None + }, + receipt: if !block_data.receipt.is_empty() { + Some(block_data.receipt) + } else { + None + }, + message_queue: if !block_data.message_queue.is_empty() { + Some(block_data.message_queue) + } else { + None + }, + justification: if !block_data.justification.is_empty() { + Some(block_data.justification) + } else if block_data.is_empty_justification { + Some(Vec::new()) + } else { + None + }, + justifications: if !block_data.justifications.is_empty() { + Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?) + } else { + None + }, + }) + }) + .collect::<Result<_, _>>() + .map_err(|error: codec::Error| error.to_string()) + } +} + +#[async_trait::async_trait] +impl<B: BlockT> BlockDownloader<B> for FullBlockDownloader { + async fn download_blocks( + &self, + who: PeerId, + request: BlockRequest<B>, + ) -> Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled> { + // Build the request protobuf. + let bytes = BlockRequestSchema { + fields: request.fields.to_be_u32(), + from_block: match request.from { + FromBlock::Hash(h) => Some(FromBlockSchema::Hash(h.encode())), + FromBlock::Number(n) => Some(FromBlockSchema::Number(n.encode())), + }, + direction: request.direction as i32, + max_blocks: request.max.unwrap_or(0), + support_multiple_justifications: true, + } + .encode_to_vec(); + + let (tx, rx) = oneshot::channel(); + self.network.start_request( + who, + self.protocol_name.clone(), + bytes, + tx, + IfDisconnected::ImmediateError, + ); + rx.await + } + + fn block_response_into_blocks( + &self, + request: &BlockRequest<B>, + response: Vec<u8>, + ) -> Result<Vec<BlockData<B>>, BlockResponseError> { + // Decode the response protobuf + let response_schema = BlockResponseSchema::decode(response.as_slice()) + .map_err(|error| BlockResponseError::DecodeFailed(error.to_string()))?; + + // Extract the block data from the protobuf + self.blocks_from_schema::<B>(request, response_schema) + .map_err(|error| BlockResponseError::ExtractionFailed(error.to_string())) + } +} diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 23847d16c97..c93ba89c622 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -23,6 +23,7 @@ use crate::{ block_announce_validator::{ BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream, }, + block_relay_protocol::BlockDownloader, service::{self, chain_sync::ToServiceCommand}, warp::WarpSyncParams, ChainSync, ClientError, SyncingService, @@ -300,7 +301,7 @@ where warp_sync_params: Option<WarpSyncParams<B>>, network_service: service::network::NetworkServiceHandle, import_queue: Box<dyn ImportQueueService<B>>, - block_request_protocol_name: ProtocolName, + block_downloader: Arc<dyn BlockDownloader<B>>, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option<ProtocolName>, rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>, @@ -392,7 +393,7 @@ where metrics_registry, network_service.clone(), import_queue, - block_request_protocol_name, + block_downloader, state_request_protocol_name, warp_sync_protocol_name, )?; diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index df0ed2c4541..20c0034966d 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -29,13 +29,14 @@ //! order to update it. use crate::{ + block_relay_protocol::{BlockDownloader, BlockResponseError}, blocks::BlockCollection, schema::v1::{StateRequest, StateResponse}, state::StateSync, warp::{WarpProofImportResult, WarpSync, WarpSyncConfig}, }; -use codec::{Decode, DecodeAll, Encode}; +use codec::Encode; use extra_requests::ExtraRequests; use futures::{channel::oneshot, task::Poll, Future, FutureExt}; use libp2p::{request_response::OutboundFailure, PeerId}; @@ -63,8 +64,8 @@ use sc_network_common::{ }, warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress}, BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification, - OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest, - OpaqueStateResponse, PeerInfo, PeerRequest, SyncMode, SyncState, SyncStatus, + OnStateData, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, PeerRequest, SyncMode, + SyncState, SyncStatus, }, }; use sp_arithmetic::traits::Saturating; @@ -93,6 +94,7 @@ mod extra_requests; mod futures_stream; mod schema; +pub mod block_relay_protocol; pub mod block_request_handler; pub mod blocks; pub mod engine; @@ -320,8 +322,8 @@ pub struct ChainSync<B: BlockT, Client> { network_service: service::network::NetworkServiceHandle, /// Protocol name used for block announcements block_announce_protocol_name: ProtocolName, - /// Protocol name used to send out block requests - block_request_protocol_name: ProtocolName, + /// Block downloader stub + block_downloader: Arc<dyn BlockDownloader<B>>, /// Protocol name used to send out state requests state_request_protocol_name: ProtocolName, /// Protocol name used to send out warp sync requests @@ -1167,72 +1169,6 @@ where } } - fn block_response_into_blocks( - &self, - request: &BlockRequest<B>, - response: OpaqueBlockResponse, - ) -> Result<Vec<BlockData<B>>, String> { - let response: Box<schema::v1::BlockResponse> = response.0.downcast().map_err(|_error| { - "Failed to downcast opaque block response during encoding, this is an \ - implementation bug." - .to_string() - })?; - - response - .blocks - .into_iter() - .map(|block_data| { - Ok(BlockData::<B> { - hash: Decode::decode(&mut block_data.hash.as_ref())?, - header: if !block_data.header.is_empty() { - Some(Decode::decode(&mut block_data.header.as_ref())?) - } else { - None - }, - body: if request.fields.contains(BlockAttributes::BODY) { - Some( - block_data - .body - .iter() - .map(|body| Decode::decode(&mut body.as_ref())) - .collect::<Result<Vec<_>, _>>()?, - ) - } else { - None - }, - indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) { - Some(block_data.indexed_body) - } else { - None - }, - receipt: if !block_data.receipt.is_empty() { - Some(block_data.receipt) - } else { - None - }, - message_queue: if !block_data.message_queue.is_empty() { - Some(block_data.message_queue) - } else { - None - }, - justification: if !block_data.justification.is_empty() { - Some(block_data.justification) - } else if block_data.is_empty_justification { - Some(Vec::new()) - } else { - None - }, - justifications: if !block_data.justifications.is_empty() { - Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?) - } else { - None - }, - }) - }) - .collect::<Result<_, _>>() - .map_err(|error: codec::Error| error.to_string()) - } - fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { self.process_outbound_requests(); @@ -1248,30 +1184,18 @@ where } fn send_block_request(&mut self, who: PeerId, request: BlockRequest<B>) { - let (tx, rx) = oneshot::channel(); - let opaque_req = self.create_opaque_block_request(&request); - if self.peers.contains_key(&who) { - self.pending_responses - .insert(who, Box::pin(async move { (who, PeerRequest::Block(request), rx.await) })); - } - - match self.encode_block_request(&opaque_req) { - Ok(data) => { - self.network_service.start_request( - who, - self.block_request_protocol_name.clone(), - data, - tx, - IfDisconnected::ImmediateError, - ); - }, - Err(err) => { - log::warn!( - target: LOG_TARGET, - "Failed to encode block request {opaque_req:?}: {err:?}", - ); - }, + let downloader = self.block_downloader.clone(); + self.pending_responses.insert( + who, + Box::pin(async move { + ( + who, + PeerRequest::Block(request.clone()), + downloader.download_blocks(who, request).await, + ) + }), + ); } } } @@ -1301,7 +1225,7 @@ where metrics_registry: Option<&Registry>, network_service: service::network::NetworkServiceHandle, import_queue: Box<dyn ImportQueueService<B>>, - block_request_protocol_name: ProtocolName, + block_downloader: Arc<dyn BlockDownloader<B>>, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option<ProtocolName>, ) -> Result<(Self, NonDefaultSetConfig), ClientError> { @@ -1337,7 +1261,7 @@ where import_existing: false, gap_sync: None, network_service, - block_request_protocol_name, + block_downloader, state_request_protocol_name, warp_sync_config, warp_sync_target_block_header: None, @@ -1710,13 +1634,6 @@ where } } - fn decode_block_response(response: &[u8]) -> Result<OpaqueBlockResponse, String> { - let response = schema::v1::BlockResponse::decode(response) - .map_err(|error| format!("Failed to decode block response: {error}"))?; - - Ok(OpaqueBlockResponse(Box::new(response))) - } - fn decode_state_response(response: &[u8]) -> Result<OpaqueStateResponse, String> { let response = StateResponse::decode(response) .map_err(|error| format!("Failed to decode state response: {error}"))?; @@ -1780,22 +1697,8 @@ where &mut self, peer_id: PeerId, request: BlockRequest<B>, - response: OpaqueBlockResponse, + blocks: Vec<BlockData<B>>, ) -> Option<ImportResult<B>> { - let blocks = match self.block_response_into_blocks(&request, response) { - Ok(blocks) => blocks, - Err(err) => { - debug!( - target: LOG_TARGET, - "Failed to decode block response from {}: {}", - peer_id, - err, - ); - self.network_service.report_peer(peer_id, rep::BAD_MESSAGE); - return None - }, - }; - let block_response = BlockResponse::<B> { id: request.id, blocks }; let blocks_range = || match ( @@ -1915,22 +1818,34 @@ where match response { Ok(Ok(resp)) => match request { PeerRequest::Block(req) => { - let response = match Self::decode_block_response(&resp[..]) { - Ok(proto) => proto, - Err(e) => { + match self.block_downloader.block_response_into_blocks(&req, resp) { + Ok(blocks) => { + if let Some(import) = self.on_block_response(id, req, blocks) { + return Poll::Ready(import) + } + }, + Err(BlockResponseError::DecodeFailed(e)) => { debug!( target: LOG_TARGET, - "Failed to decode block response from peer {id:?}: {e:?}.", + "Failed to decode block response from peer {:?}: {:?}.", + id, + e ); self.network_service.report_peer(id, rep::BAD_MESSAGE); self.network_service .disconnect_peer(id, self.block_announce_protocol_name.clone()); continue }, - }; - - if let Some(import) = self.on_block_response(id, req, response) { - return Poll::Ready(import) + Err(BlockResponseError::ExtractionFailed(e)) => { + debug!( + target: LOG_TARGET, + "Failed to extract blocks from peer response {:?}: {:?}.", + id, + e + ); + self.network_service.report_peer(id, rep::BAD_MESSAGE); + continue + }, } }, PeerRequest::State => { @@ -2010,31 +1925,6 @@ where Poll::Pending } - /// Create implementation-specific block request. - fn create_opaque_block_request(&self, request: &BlockRequest<B>) -> OpaqueBlockRequest { - OpaqueBlockRequest(Box::new(schema::v1::BlockRequest { - fields: request.fields.to_be_u32(), - from_block: match request.from { - FromBlock::Hash(h) => Some(schema::v1::block_request::FromBlock::Hash(h.encode())), - FromBlock::Number(n) => - Some(schema::v1::block_request::FromBlock::Number(n.encode())), - }, - direction: request.direction as i32, - max_blocks: request.max.unwrap_or(0), - support_multiple_justifications: true, - })) - } - - fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result<Vec<u8>, String> { - let request: &schema::v1::BlockRequest = request.0.downcast_ref().ok_or_else(|| { - "Failed to downcast opaque block response during encoding, this is an \ - implementation bug." - .to_string() - })?; - - Ok(request.encode_to_vec()) - } - fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String> { let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| { "Failed to downcast opaque state response during encoding, this is an \ @@ -2909,7 +2799,7 @@ fn validate_blocks<Block: BlockT>( #[cfg(test)] mod test { use super::*; - use crate::service::network::NetworkServiceProvider; + use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider}; use futures::executor::block_on; use sc_block_builder::BlockBuilderProvider; use sc_network_common::{ @@ -2947,7 +2837,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3013,7 +2903,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3187,7 +3077,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3313,7 +3203,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3470,7 +3360,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3612,7 +3502,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3756,7 +3646,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3801,7 +3691,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) @@ -3853,7 +3743,7 @@ mod test { None, chain_sync_network_handle, import_queue, - ProtocolName::from("block-request"), + Arc::new(MockBlockDownloader::new()), ProtocolName::from("state-request"), None, ) diff --git a/substrate/client/network/sync/src/mock.rs b/substrate/client/network/sync/src/mock.rs index d37095c17d2..859f9fb9c54 100644 --- a/substrate/client/network/sync/src/mock.rs +++ b/substrate/client/network/sync/src/mock.rs @@ -16,15 +16,17 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -//! Contains a mock implementation of `ChainSync` that can be used -//! for testing calls made to `ChainSync`. +//! Contains mock implementations of `ChainSync` and 'BlockDownloader'. -use futures::task::Poll; +use crate::block_relay_protocol::{BlockDownloader as BlockDownloaderT, BlockResponseError}; + +use futures::{channel::oneshot, task::Poll}; use libp2p::PeerId; +use sc_network::RequestFailure; use sc_network_common::sync::{ message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}, - BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, - OpaqueBlockResponse, PeerInfo, SyncStatus, + BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, PeerInfo, + SyncStatus, }; use sp_runtime::traits::{Block as BlockT, NumberFor}; @@ -79,11 +81,6 @@ mockall::mock! { ); fn peer_disconnected(&mut self, who: &PeerId); fn metrics(&self) -> Metrics; - fn block_response_into_blocks( - &self, - request: &BlockRequest<Block>, - response: OpaqueBlockResponse, - ) -> Result<Vec<BlockData<Block>>, String>; fn poll<'a>( &mut self, cx: &mut std::task::Context<'a>, @@ -95,3 +92,21 @@ mockall::mock! { ); } } + +mockall::mock! { + pub BlockDownloader<Block: BlockT> {} + + #[async_trait::async_trait] + impl<Block: BlockT> BlockDownloaderT<Block> for BlockDownloader<Block> { + async fn download_blocks( + &self, + who: PeerId, + request: BlockRequest<Block>, + ) -> Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled>; + fn block_response_into_blocks( + &self, + request: &BlockRequest<Block>, + response: Vec<u8>, + ) -> Result<Vec<BlockData<Block>>, BlockResponseError>; + } +} diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index d350b0e54ae..11505903e35 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -798,12 +798,18 @@ pub trait TestNetFactory: Default + Sized + Send { let fork_id = Some(String::from("test-fork-id")); - let block_request_protocol_config = { - let (handler, protocol_config) = - BlockRequestHandler::new(&protocol_id, None, client.clone(), 50); - self.spawn_task(handler.run().boxed()); - protocol_config - }; + let (chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let mut block_relay_params = BlockRequestHandler::new( + chain_sync_network_handle.clone(), + &protocol_id, + None, + client.clone(), + 50, + ); + self.spawn_task(Box::pin(async move { + block_relay_params.server.run().await; + })); let state_request_protocol_config = { let (handler, protocol_config) = @@ -848,8 +854,6 @@ pub trait TestNetFactory: Default + Sized + Send { let block_announce_validator = config .block_announce_validator .unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)); - let (chain_sync_network_provider, chain_sync_network_handle) = - NetworkServiceProvider::new(); let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, sync_service, block_announce_config) = @@ -864,7 +868,7 @@ pub trait TestNetFactory: Default + Sized + Send { Some(warp_sync_params), chain_sync_network_handle, import_queue.service(), - block_request_protocol_config.name.clone(), + block_relay_params.downloader, state_request_protocol_config.name.clone(), Some(warp_protocol_config.name.clone()), rx, @@ -877,7 +881,7 @@ pub trait TestNetFactory: Default + Sized + Send { full_net_config.add_request_response_protocol(config); } for config in [ - block_request_protocol_config, + block_relay_params.request_response_config, state_request_protocol_config, light_client_request_protocol_config, warp_protocol_config, diff --git a/substrate/client/network/test/src/service.rs b/substrate/client/network/test/src/service.rs index 68e780545bb..62d7f9f9d1b 100644 --- a/substrate/client/network/test/src/service.rs +++ b/substrate/client/network/test/src/service.rs @@ -156,12 +156,18 @@ impl TestNetworkBuilder { let fork_id = Some(String::from("test-fork-id")); let mut full_net_config = FullNetworkConfiguration::new(&network_config); - let block_request_protocol_config = { - let (handler, protocol_config) = - BlockRequestHandler::new(&protocol_id, None, client.clone(), 50); - tokio::spawn(handler.run().boxed()); - protocol_config - }; + let (chain_sync_network_provider, chain_sync_network_handle) = + self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); + let mut block_relay_params = BlockRequestHandler::new( + chain_sync_network_handle.clone(), + &protocol_id, + None, + client.clone(), + 50, + ); + tokio::spawn(Box::pin(async move { + block_relay_params.server.run().await; + })); let state_request_protocol_config = { let (handler, protocol_config) = @@ -177,8 +183,6 @@ impl TestNetworkBuilder { protocol_config }; - let (chain_sync_network_provider, chain_sync_network_handle) = - self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config::Role::Full), @@ -191,7 +195,7 @@ impl TestNetworkBuilder { None, chain_sync_network_handle, import_queue.service(), - block_request_protocol_config.name.clone(), + block_relay_params.downloader, state_request_protocol_config.name.clone(), None, rx, @@ -214,7 +218,7 @@ impl TestNetworkBuilder { } for config in [ - block_request_protocol_config, + block_relay_params.request_response_config, state_request_protocol_config, light_client_request_protocol_config, ] { diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 917b3be8dc7..3838accde02 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -50,10 +50,10 @@ use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::role::Roles; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ - block_request_handler::BlockRequestHandler, engine::SyncingEngine, - service::network::NetworkServiceProvider, state_request_handler::StateRequestHandler, - warp::WarpSyncParams, warp_request_handler::RequestHandler as WarpSyncRequestHandler, - SyncingService, + block_relay_protocol::BlockRelayParams, block_request_handler::BlockRequestHandler, + engine::SyncingEngine, service::network::NetworkServiceProvider, + state_request_handler::StateRequestHandler, warp::WarpSyncParams, + warp_request_handler::RequestHandler as WarpSyncRequestHandler, SyncingService, }; use sc_rpc::{ author::AuthorApiServer, @@ -696,6 +696,9 @@ pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> { Option<Box<dyn FnOnce(Arc<TCl>) -> Box<dyn BlockAnnounceValidator<TBl> + Send> + Send>>, /// Optional warp sync params. pub warp_sync_params: Option<WarpSyncParams<TBl>>, + /// User specified block relay params. If not specified, the default + /// block request handler will be used. + pub block_relay: Option<BlockRelayParams<TBl>>, } /// Build the network service, the network status sinks and an RPC sender. @@ -734,6 +737,7 @@ where import_queue, block_announce_validator_builder, warp_sync_params, + block_relay, } = params; if warp_sync_params.is_none() && config.network.sync_mode.is_warp() { @@ -757,19 +761,26 @@ where Box::new(DefaultBlockAnnounceValidator) }; - let (block_request_protocol_config, block_request_protocol_name) = { - // Allow both outgoing and incoming requests. - let (handler, protocol_config) = BlockRequestHandler::new( - &protocol_id, - config.chain_spec.fork_id(), - client.clone(), - net_config.network_config.default_peers_set.in_peers as usize + - net_config.network_config.default_peers_set.out_peers as usize, - ); - let config_name = protocol_config.name.clone(); - spawn_handle.spawn("block-request-handler", Some("networking"), handler.run()); - (protocol_config, config_name) + let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); + let (mut block_server, block_downloader, block_request_protocol_config) = match block_relay { + Some(params) => (params.server, params.downloader, params.request_response_config), + None => { + // Custom protocol was not specified, use the default block handler. + // Allow both outgoing and incoming requests. + let params = BlockRequestHandler::new( + chain_sync_network_handle.clone(), + &protocol_id, + config.chain_spec.fork_id(), + client.clone(), + config.network.default_peers_set.in_peers as usize + + config.network.default_peers_set.out_peers as usize, + ); + (params.server, params.downloader, params.request_response_config) + }, }; + spawn_handle.spawn("block-request-handler", Some("networking"), async move { + block_server.run().await; + }); let (state_request_protocol_config, state_request_protocol_name) = { let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize + @@ -860,7 +871,6 @@ where spawn_handle.spawn("peer-store", Some("networking"), peer_store.run()); let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); - let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (engine, sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config.role), client.clone(), @@ -872,7 +882,7 @@ where warp_sync_params, chain_sync_network_handle, import_queue.service(), - block_request_protocol_name, + block_downloader, state_request_protocol_name, warp_request_protocol_name, rx, diff --git a/substrate/client/transaction-pool/api/src/lib.rs b/substrate/client/transaction-pool/api/src/lib.rs index a132cbc46e9..73cc513708d 100644 --- a/substrate/client/transaction-pool/api/src/lib.rs +++ b/substrate/client/transaction-pool/api/src/lib.rs @@ -22,6 +22,7 @@ pub mod error; use async_trait::async_trait; +use codec::Codec; use futures::{Future, Stream}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sp_core::offchain::TransactionPoolExt; @@ -187,7 +188,7 @@ pub trait TransactionPool: Send + Sync { /// Block type. type Block: BlockT; /// Transaction hash type. - type Hash: Hash + Eq + Member + Serialize + DeserializeOwned; + type Hash: Hash + Eq + Member + Serialize + DeserializeOwned + Codec; /// In-pool transaction type. type InPoolTransaction: InPoolTransaction< Transaction = TransactionFor<Self>, -- GitLab