Unverified Commit d8f6170c authored by Pierre Krieger's avatar Pierre Krieger Committed by GitHub
Browse files

Grab stream of networking events earlier (#3025)

parent 13af232b
Pipeline #138188 passed with stages
in 29 minutes and 55 seconds
......@@ -23,6 +23,7 @@
use parity_scale_codec::{Encode, Decode};
use parking_lot::Mutex;
use futures::prelude::*;
use futures::stream::BoxStream;
use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;
......@@ -277,10 +278,14 @@ impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
AD: validator_discovery::AuthorityDiscovery,
Context: SubsystemContext<Message=NetworkBridgeMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
fn start(mut self, ctx: Context) -> SpawnedSubsystem {
// The stream of networking events has to be created at initialization, otherwise the
// networking might open connections before the stream of events has been grabbed.
let network_stream = self.network_service.event_stream();
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
let future = run_network(self, ctx)
let future = run_network(self, ctx, network_stream)
.map_err(|e| {
SubsystemError::with_origin("network-bridge", e)
})
......@@ -535,13 +540,12 @@ where
async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
mut sender: impl SubsystemSender,
mut network_service: impl Network,
mut network_stream: BoxStream<'static, NetworkEvent>,
mut authority_discovery_service: AD,
mut request_multiplexer: RequestMultiplexer,
metrics: Metrics,
shared: Shared,
) -> Result<(), UnexpectedAbort> {
let mut network_stream = network_service.event_stream();
loop {
futures::select! {
network_event = network_stream.next().fuse() => match network_event {
......@@ -798,10 +802,11 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
/// #fn is_send<T: Send>();
/// #is_send::<parking_lot::MutexGuard<'static, ()>();
/// ```
#[tracing::instrument(skip(bridge, ctx), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(bridge, ctx, network_stream), fields(subsystem = LOG_TARGET))]
async fn run_network<N, AD>(
bridge: NetworkBridge<N, AD>,
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
network_stream: BoxStream<'static, NetworkEvent>,
) -> SubsystemResult<()>
where
N: Network,
......@@ -824,6 +829,7 @@ where
let (remote, network_event_handler) = handle_network_messages(
ctx.sender().clone(),
network_service.clone(),
network_stream,
authority_discovery_service.clone(),
request_multiplexer,
metrics.clone(),
......@@ -1351,8 +1357,9 @@ mod tests {
) {
let pool = sp_core::testing::TaskExecutor::new();
let (request_multiplexer, req_configs) = RequestMultiplexer::new();
let (network, network_handle, discovery) = new_test_network(req_configs);
let (mut network, network_handle, discovery) = new_test_network(req_configs);
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let network_stream = network.event_stream();
let bridge = NetworkBridge {
network_service: network,
......@@ -1365,6 +1372,7 @@ mod tests {
let network_bridge = run_network(
bridge,
context,
network_stream,
)
.map_err(|_| panic!("subsystem execution failed"))
.map(|_| ());
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment