diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 0f214fe87aa377d782f839f9c627a967bda9f082..8ac3012df7f142f9793add743a4fa9799d47d495 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -6078,6 +6078,7 @@ dependencies = [ "unsigned-varint", "void", "wasm-timer", + "yamux", "zeroize 1.1.0", ] diff --git a/substrate/client/cli/src/lib.rs b/substrate/client/cli/src/lib.rs index a2e9fa96dad7406709da38e8657c819e53c062ff..7495ad8e75690b10d68da914c029c6fd711c25cf 100644 --- a/substrate/client/cli/src/lib.rs +++ b/substrate/client/cli/src/lib.rs @@ -424,6 +424,7 @@ fn fill_network_configuration( enable_mdns: !is_dev && !cli.no_mdns, allow_private_ipv4: !cli.no_private_ipv4, wasm_external_transport: None, + use_yamux_flow_control: cli.use_yamux_flow_control }; config.max_parallel_downloads = cli.max_parallel_downloads; diff --git a/substrate/client/cli/src/params.rs b/substrate/client/cli/src/params.rs index 0881247b07d066eb2d52dff117a2c0a08d4142c3..2ffa8bd61b1f718b3cfca15dea83bb582dc77cd8 100644 --- a/substrate/client/cli/src/params.rs +++ b/substrate/client/cli/src/params.rs @@ -238,6 +238,10 @@ pub struct NetworkConfigurationParams { #[allow(missing_docs)] #[structopt(flatten)] pub node_key_params: NodeKeyParams, + + /// Experimental feature flag. + #[structopt(long = "use-yamux-flow-control")] + pub use_yamux_flow_control: bool, } arg_enum! { diff --git a/substrate/client/network/Cargo.toml b/substrate/client/network/Cargo.toml index 10d5f1ce9b6a8f999a11ca6ee17c6d018437ece7..db3e32393d0376e8f73aa91aa107b3e6fd9cd9fe 100644 --- a/substrate/client/network/Cargo.toml +++ b/substrate/client/network/Cargo.toml @@ -48,6 +48,7 @@ substrate-test-runtime-client = { version = "2.0.0", optional = true, path = ".. unsigned-varint = { version = "0.3.0", features = ["codec"] } void = "1.0.2" zeroize = "1.0.0" +yamux = "0.4.2" [dev-dependencies] env_logger = "0.7.0" diff --git a/substrate/client/network/src/config.rs b/substrate/client/network/src/config.rs index 6cf2587fe47cb4ad5a344cea4bfc404b49b2e2d7..87c77fee9f0fcf5502de7b65e6fd0107b7964298 100644 --- a/substrate/client/network/src/config.rs +++ b/substrate/client/network/src/config.rs @@ -292,6 +292,7 @@ impl Default for NetworkConfiguration { enable_mdns: false, allow_private_ipv4: true, wasm_external_transport: None, + use_yamux_flow_control: false, }, max_parallel_downloads: 5, } @@ -348,6 +349,8 @@ pub enum TransportConfig { /// This parameter exists whatever the target platform is, but it is expected to be set to /// `Some` only when compiling for WASM. wasm_external_transport: Option<wasm_ext::ExtTransport>, + /// Use flow control for yamux streams if set to true. + use_yamux_flow_control: bool, }, /// Only allow connections within the same process. diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index b4281112f6115458d8d7a40add51200fdc4af74e..3dc8a49764938fab7cdbfbda58220972663ea8c5 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -235,12 +235,12 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker u64::from(params.network_config.out_peers) + 15, )); let (transport, bandwidth) = { - let (config_mem, config_wasm) = match params.network_config.transport { - TransportConfig::MemoryOnly => (true, None), - TransportConfig::Normal { wasm_external_transport, .. } => - (false, wasm_external_transport) + let (config_mem, config_wasm, flowctrl) = match params.network_config.transport { + TransportConfig::MemoryOnly => (true, None, false), + TransportConfig::Normal { wasm_external_transport, use_yamux_flow_control, .. } => + (false, wasm_external_transport, use_yamux_flow_control) }; - transport::build_transport(local_identity, config_mem, config_wasm) + transport::build_transport(local_identity, config_mem, config_wasm, flowctrl) }; let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone()); if let Some(spawner) = params.executor { diff --git a/substrate/client/network/src/transport.rs b/substrate/client/network/src/transport.rs index 6b5c18cf33f186d22d2fe3f8497c8579e896dea8..e2c95824f83204d6c24c1f96a8f394c4b98b0d21 100644 --- a/substrate/client/network/src/transport.rs +++ b/substrate/client/network/src/transport.rs @@ -17,7 +17,7 @@ use futures::prelude::*; use libp2p::{ InboundUpgradeExt, OutboundUpgradeExt, PeerId, Transport, - mplex, identity, yamux, bandwidth, wasm_ext + mplex, identity, bandwidth, wasm_ext }; #[cfg(not(target_os = "unknown"))] use libp2p::{tcp, dns, websocket, noise}; @@ -36,7 +36,8 @@ pub use self::bandwidth::BandwidthSinks; pub fn build_transport( keypair: identity::Keypair, memory_only: bool, - wasm_external_transport: Option<wasm_ext::ExtTransport> + wasm_external_transport: Option<wasm_ext::ExtTransport>, + use_yamux_flow_control: bool ) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc<bandwidth::BandwidthSinks>) { // Build configuration objects for encryption mechanisms. #[cfg(not(target_os = "unknown"))] @@ -55,7 +56,18 @@ pub fn build_transport( let mut mplex_config = mplex::MplexConfig::new(); mplex_config.max_buffer_len_behaviour(mplex::MaxBufferBehaviour::Block); mplex_config.max_buffer_len(usize::MAX); - let yamux_config = yamux::Config::default(); + + let yamux_config = { + let mut c = yamux::Config::default(); + // Only set SYN flag on first data frame sent to the remote. + c.set_lazy_open(true); + if use_yamux_flow_control { + // Enable proper flow-control: window updates are only sent when + // buffered data has been consumed. + c.set_window_update_mode(yamux::WindowUpdateMode::OnRead); + } + libp2p::yamux::Config::new(c) + }; // Build the base layer of the transport. let transport = if let Some(t) = wasm_external_transport { diff --git a/substrate/client/service/test/src/lib.rs b/substrate/client/service/test/src/lib.rs index 2976e66a2982f6190699736a002da3b712bd72fd..723c13ec8225be63b5134f0685d844cf8cd4b90e 100644 --- a/substrate/client/service/test/src/lib.rs +++ b/substrate/client/service/test/src/lib.rs @@ -166,6 +166,7 @@ fn node_config<G, E: Clone> ( enable_mdns: false, allow_private_ipv4: true, wasm_external_transport: None, + use_yamux_flow_control: true, }, max_parallel_downloads: NetworkConfiguration::default().max_parallel_downloads, }; diff --git a/substrate/utils/browser/src/lib.rs b/substrate/utils/browser/src/lib.rs index b054d73ac9a9eb2bca66558ea588d417195e341b..d82f982e14b650bde020b0847c027846960bbfda 100644 --- a/substrate/utils/browser/src/lib.rs +++ b/substrate/utils/browser/src/lib.rs @@ -53,6 +53,7 @@ where wasm_external_transport: Some(transport.clone()), allow_private_ipv4: true, enable_mdns: false, + use_yamux_flow_control: true, }; config.task_executor = Some(Arc::new(move |fut| { wasm_bindgen_futures::spawn_local(fut)