From babfc44736159e27e67d6fcdb5f8306df008b52a Mon Sep 17 00:00:00 2001 From: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Tue, 16 Jul 2019 01:49:44 +0200 Subject: [PATCH] Make mDNS part of DiscoveryBehaviour (#3116) * Make mDNS part of DiscoveryBehaviour * Fix tests * Address concern --- substrate/core/network/src/behaviour.rs | 45 ++---------------- substrate/core/network/src/discovery.rs | 63 +++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 46 deletions(-) diff --git a/substrate/core/network/src/behaviour.rs b/substrate/core/network/src/behaviour.rs index 4390c663972..2550e906600 100644 --- a/substrate/core/network/src/behaviour.rs +++ b/substrate/core/network/src/behaviour.rs @@ -26,10 +26,6 @@ use libp2p::core::{Multiaddr, PeerId, PublicKey}; use libp2p::core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}; use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; use libp2p::multihash::Multihash; -#[cfg(not(target_os = "unknown"))] -use libp2p::core::swarm::toggle::Toggle; -#[cfg(not(target_os = "unknown"))] -use libp2p::mdns::{Mdns, MdnsEvent}; use log::warn; use runtime_primitives::traits::Block as BlockT; use std::iter; @@ -44,11 +40,8 @@ pub struct Behaviour<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> { /// Periodically pings and identifies the nodes we are connected to, and store information in a /// cache. debug_info: debug_info::DebugInfoBehaviour<Substream<StreamMuxerBox>>, - /// Discovers nodes of the network. Defined below. + /// Discovers nodes of the network. discovery: DiscoveryBehaviour<Substream<StreamMuxerBox>>, - /// Discovers nodes on the local network. - #[cfg(not(target_os = "unknown"))] - mdns: Toggle<Mdns<Substream<StreamMuxerBox>>>, /// Queue of events to produce for the outside. #[behaviour(ignore)] @@ -70,29 +63,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> { known_addresses: Vec<(PeerId, Multiaddr)>, enable_mdns: bool, ) -> Self { - let debug_info = debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()); - - if enable_mdns { - #[cfg(target_os = "unknown")] - warn!(target: "sub-libp2p", "mDNS is not available on this platform"); - } - Behaviour { substrate, - debug_info, - discovery: DiscoveryBehaviour::new(local_public_key, known_addresses), - #[cfg(not(target_os = "unknown"))] - mdns: if enable_mdns { - match Mdns::new() { - Ok(mdns) => Some(mdns).into(), - Err(err) => { - warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err); - None.into() - } - } - } else { - None.into() - }, + debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()), + discovery: DiscoveryBehaviour::new(local_public_key, known_addresses, enable_mdns), events: Vec::new(), } } @@ -195,19 +169,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventPr } } -#[cfg(not(target_os = "unknown"))] -impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<MdnsEvent> for - Behaviour<B, S, H> { - fn inject_event(&mut self, event: MdnsEvent) { - match event { - MdnsEvent::Discovered(list) => { - self.substrate.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id)); - }, - MdnsEvent::Expired(_) => {} - } - } -} - impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> { fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<B>>> { if !self.events.is_empty() { diff --git a/substrate/core/network/src/discovery.rs b/substrate/core/network/src/discovery.rs index 27c3982734a..1a377ba8721 100644 --- a/substrate/core/network/src/discovery.rs +++ b/substrate/core/network/src/discovery.rs @@ -24,8 +24,7 @@ //! - Bootstrap nodes. These are hard-coded node identities and addresses passed in the constructor //! of the `DiscoveryBehaviour`. You can also call `add_known_address` later to add an entry. //! -//! - mDNS. As of the writing of this documentation, mDNS is handled somewhere else. It is planned -//! to be moved here. +//! - mDNS. Discovers nodes on the local network by broadcasting UDP packets. //! //! - Kademlia random walk. Once connected, we perform random Kademlia `FIND_NODE` requests in //! order for nodes to propagate to us their view of the network. This is performed automatically @@ -50,7 +49,11 @@ use futures::prelude::*; use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; use libp2p::core::swarm::PollParameters; +#[cfg(not(target_os = "unknown"))] +use libp2p::core::{swarm::toggle::Toggle, nodes::Substream, muxing::StreamMuxerBox}; use libp2p::kad::{GetValueResult, Kademlia, KademliaOut, PutValueResult}; +#[cfg(not(target_os = "unknown"))] +use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::multihash::Multihash; use libp2p::multiaddr::Protocol; use log::{debug, info, trace, warn}; @@ -65,6 +68,9 @@ pub struct DiscoveryBehaviour<TSubstream> { user_defined: Vec<(PeerId, Multiaddr)>, /// Kademlia requests and answers. kademlia: Kademlia<TSubstream>, + /// Discovers nodes on the local network. + #[cfg(not(target_os = "unknown"))] + mdns: Toggle<Mdns<Substream<StreamMuxerBox>>>, /// Stream that fires when we need to perform the next random Kademlia query. next_kad_random_query: Delay, /// After `next_kad_random_query` triggers, the next one triggers after this duration. @@ -83,7 +89,16 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> { /// Builds a new `DiscoveryBehaviour`. /// /// `user_defined` is a list of known address for nodes that never expire. - pub fn new(local_public_key: PublicKey, user_defined: Vec<(PeerId, Multiaddr)>) -> Self { + pub fn new( + local_public_key: PublicKey, + user_defined: Vec<(PeerId, Multiaddr)>, + enable_mdns: bool + ) -> Self { + if enable_mdns { + #[cfg(target_os = "unknown")] + warn!(target: "sub-libp2p", "mDNS is not available on this platform"); + } + let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id()); for (peer_id, addr) in &user_defined { kademlia.add_address(peer_id, addr.clone()); @@ -99,6 +114,18 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> { clock, local_peer_id: local_public_key.into_peer_id(), num_connections: 0, + #[cfg(not(target_os = "unknown"))] + mdns: if enable_mdns { + match Mdns::new() { + Ok(mdns) => Some(mdns).into(), + Err(err) => { + warn!(target: "sub-libp2p", "Failed to initialize mDNS: {:?}", err); + None.into() + } + } + } else { + None.into() + }, } } @@ -321,6 +348,34 @@ where } } + // Poll mDNS. + #[cfg(not(target_os = "unknown"))] + loop { + match self.mdns.poll(params) { + Async::NotReady => break, + Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { + match event { + MdnsEvent::Discovered(list) => { + self.discoveries.extend(list.into_iter().map(|(peer_id, _)| peer_id)); + if let Some(peer_id) = self.discoveries.pop_front() { + let ev = DiscoveryOut::Discovered(peer_id); + return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } + }, + MdnsEvent::Expired(_) => {} + } + }, + Async::Ready(NetworkBehaviourAction::DialAddress { address }) => + return Async::Ready(NetworkBehaviourAction::DialAddress { address }), + Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => + return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Async::Ready(NetworkBehaviourAction::SendEvent { event, .. }) => + match event {}, // `event` is an enum with no variant + Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => + return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), + } + } + Async::NotReady } } @@ -355,7 +410,7 @@ mod tests { upgrade::apply(out.stream, upgrade, endpoint) }); - let behaviour = DiscoveryBehaviour::new(keypair.public(), user_defined.clone()); + let behaviour = DiscoveryBehaviour::new(keypair.public(), user_defined.clone(), false); let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap(); -- GitLab