From 11b9496a6a256ebd906fc36f84a0e0f73478d1b1 Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Mon, 6 Aug 2018 11:57:08 +0200
Subject: [PATCH] NetworkService::new starts the network (#462)

---
 .../substrate/network-libp2p/src/service.rs   | 153 ++++++++----------
 .../substrate/network-libp2p/tests/tests.rs   |  25 +--
 substrate/substrate/network/src/service.rs    |  65 +++-----
 substrate/substrate/service/src/lib.rs        |  11 +-
 4 files changed, 103 insertions(+), 151 deletions(-)

diff --git a/substrate/substrate/network-libp2p/src/service.rs b/substrate/substrate/network-libp2p/src/service.rs
index 691887cda96..a5841cac299 100644
--- a/substrate/substrate/network-libp2p/src/service.rs
+++ b/substrate/substrate/network-libp2p/src/service.rs
@@ -17,7 +17,7 @@
 use bytes::Bytes;
 use {Error, ErrorKind, NetworkConfiguration, NetworkProtocolHandler};
 use {NonReservedPeerMode, NetworkContext, Severity, NodeIndex, ProtocolId};
-use parking_lot::{Mutex, RwLock};
+use parking_lot::RwLock;
 use libp2p;
 use libp2p::multiaddr::{AddrComponent, Multiaddr};
 use libp2p::kad::{KadSystem, KadConnecConfig, KadSystemConfig};
@@ -56,10 +56,10 @@ pub struct NetworkService {
 	shared: Arc<Shared>,
 
 	/// Holds the networking-running background thread alive. The `Option` is
-	/// `None` if the service is stopped.
+	/// only set to `None` in the destructor.
 	/// Sending a message on the channel will trigger the end of the
 	/// background thread. We can then wait on the join handle.
-	bg_thread: Mutex<Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>>,
+	bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>,
 }
 
 /// Common struct shared throughout all the components of the service.
@@ -79,17 +79,18 @@ struct Shared {
 	/// List of protocols available on the network. It is a logic error to
 	/// remove protocols from this list, and the code may assume that protocols
 	/// stay at the same index forever.
-	protocols: RwLock<RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>>,
+	protocols: RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>,
 
 	/// Use this channel to send a timeout request to the background thread's
 	/// events loop. After the timeout, elapsed, it will call `timeout` on the
 	/// `NetworkProtocolHandler`. This can be closed if the background thread
 	/// is not running. The sender will be overwritten every time we start
 	/// the service.
-	timeouts_register_tx: RwLock<mpsc::UnboundedSender<(Duration, (Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, TimerToken))>>,
+	timeouts_register_tx: mpsc::UnboundedSender<(Duration, (Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, TimerToken))>,
 
 	/// Original address from the configuration, after being adjusted by the `Transport`.
-	/// Contains `None` if the network hasn't started yet.
+	// TODO: because we create the `Shared` before starting to listen, this
+	// has to be set later ; sort this out
 	original_listened_addr: RwLock<Option<Multiaddr>>,
 
 	/// Contains the addresses we known about ourselves.
@@ -97,9 +98,13 @@ struct Shared {
 }
 
 impl NetworkService {
-	/// Starts IO event loop
+	/// Starts the networking service.
+	///
+	/// Note that we could use an iterator for `protocols`, but having a
+	/// generic here is too much and crashes the Rust compiler.
 	pub fn new(
 		config: NetworkConfiguration,
+		protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>,
 		filter: Option<Arc<ConnectionFilter>>
 	) -> Result<NetworkService, Error> {
 		// TODO: for now `filter` is always `None` ; remove it from the code or implement it
@@ -121,75 +126,41 @@ impl NetworkService {
 			known_initial_peers: network_state.known_peers(),
 		});
 
+		// Channel we use to signal success or failure of the bg thread
+		// initialization process.
+		let (init_tx, init_rx) = sync_mpsc::channel();
+		// Channel the main thread uses to signal the bg thread that it
+		// should stop
+		let (close_tx, close_rx) = oneshot::channel();
+		let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded();
+
 		let shared = Arc::new(Shared {
 			network_state,
-			protocols: RwLock::new(Default::default()),
+			protocols: RegisteredProtocols(protocols.into_iter()
+				.map(|(handler, protocol, versions)|
+					RegisteredProtocol::new(handler.clone(), protocol, versions))
+				.collect()
+			),
 			kad_system,
 			kad_upgrade: KadConnecConfig::new(),
 			config,
-			timeouts_register_tx: RwLock::new(mpsc::unbounded().0),
+			timeouts_register_tx,
 			original_listened_addr: RwLock::new(None),
 			listened_addrs: RwLock::new(Vec::new()),
 		});
 
-		Ok(NetworkService {
-			shared,
-			bg_thread: Mutex::new(None),
-		})
-	}
-
-	/// Returns network configuration.
-	pub fn config(&self) -> &NetworkConfiguration {
-		&self.shared.config
-	}
-
-	pub fn external_url(&self) -> Option<String> {
-		// TODO: in the context of libp2p, it is hard to define what an external
-		// URL is, as different nodes can have multiple different ways to
-		// reach us
-		self.shared.original_listened_addr.read().as_ref()
-			.map(|addr|
-				format!("{}/p2p/{}", addr, self.shared.kad_system.local_peer_id().to_base58())
-			)
-	}
-
-	/// Start network IO.
-	/// Note that we could use an iterator for `protocols`, but having a
-	/// generic here is too much and crashes the Rust compiler.
-	// TODO (design): the notion of having a `NetworkService` alive should mean
-	// that it is running ; the `start` and `stop` functions are bad design
-	pub fn start(
-		&self,
-		protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>
-	) -> Result<(), (Error, Option<SocketAddr>)> {
-		// TODO: check that service is started already?
-
-		*self.shared.protocols.write() = RegisteredProtocols(
-			protocols.into_iter()
-				.map(|(handler, protocol, versions)|
-					RegisteredProtocol::new(handler.clone(), protocol, versions))
-				.collect()
-		);
-
-		// Channel we use to signal success or failure of the bg thread
-		// initialization process.
-		let (init_tx, init_rx) = sync_mpsc::channel();
-		// Channel the main thread uses to signal the bg thread that it
-		// should stop
-		let (close_tx, close_rx) = oneshot::channel();
-		let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded();
-		*self.shared.timeouts_register_tx.write() = timeouts_register_tx;
-			
 		// Initialize all the protocols now.
-		for protocol in self.shared.protocols.read().0.iter() {
+		// TODO: what about failure to initialize? we can't uninitialize a protocol
+		// TODO: remove this `initialize` method eventually, as it's only used for timers
+		for protocol in shared.protocols.0.iter() {
 			protocol.custom_data().initialize(&NetworkContextImpl {
-				inner: self.shared.clone(),
+				inner: shared.clone(),
 				protocol: protocol.id().clone(),
 				current_peer: None,
 			});
 		}
 
-		let shared = self.shared.clone();
+		let shared_clone = shared.clone();
 		let join_handle = thread::spawn(move || {
 			// Tokio runtime that is going to run everything in this thread.
 			let mut runtime = match current_thread::Runtime::new() {
@@ -200,8 +171,7 @@ impl NetworkService {
 				}
 			};
 
-			let fut = match init_thread(shared,
-				timeouts_register_rx, close_rx) {
+			let fut = match init_thread(shared_clone, timeouts_register_rx, close_rx) {
 				Ok(future) => {
 					debug!(target: "sub-libp2p", "Successfully started networking service");
 					let _ = init_tx.send(Ok(()));
@@ -219,23 +189,27 @@ impl NetworkService {
 			}
 		});
 
-		init_rx.recv().expect("libp2p background thread panicked")
-			.map_err(|err| (err, self.shared.config.listen_address.clone()))?;
+		init_rx.recv().expect("libp2p background thread panicked")?;
 
-		*self.bg_thread.lock() = Some((close_tx, join_handle));
-		Ok(())
+		Ok(NetworkService {
+			shared,
+			bg_thread: Some((close_tx, join_handle)),
+		})
 	}
 
-	/// Stop network IO.
-	pub fn stop(&self) {
-		if let Some((close_tx, join)) = self.bg_thread.lock().take() {
-			let _ = close_tx.send(());
-			if let Err(e) = join.join() {
-				warn!(target: "sub-libp2p", "error while waiting on libp2p background thread: {:?}", e);
-			}
-		}
+	/// Returns network configuration.
+	pub fn config(&self) -> &NetworkConfiguration {
+		&self.shared.config
+	}
 
-		debug_assert!(!self.shared.network_state.has_connected_peer());
+	pub fn external_url(&self) -> Option<String> {
+		// TODO: in the context of libp2p, it is hard to define what an external
+		// URL is, as different nodes can have multiple different ways to
+		// reach us
+		self.shared.original_listened_addr.read().as_ref()
+			.map(|addr|
+				format!("{}/p2p/{}", addr, self.shared.kad_system.local_peer_id().to_base58())
+			)
 	}
 
 	/// Get a list of all connected peers by id.
@@ -269,7 +243,7 @@ impl NetworkService {
 	pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, action: F)
 		-> Option<T>
 		where F: FnOnce(&NetworkContext) -> T {
-		if !self.shared.protocols.read().has_protocol(protocol) {
+		if !self.shared.protocols.has_protocol(protocol) {
 			return None
 		}
 
@@ -283,7 +257,14 @@ impl NetworkService {
 
 impl Drop for NetworkService {
 	fn drop(&mut self) {
-		self.stop()
+		if let Some((close_tx, join)) = self.bg_thread.take() {
+			let _ = close_tx.send(());
+			if let Err(e) = join.join() {
+				warn!(target: "sub-libp2p", "error while waiting on libp2p background thread: {:?}", e);
+			}
+		}
+
+		debug_assert!(!self.shared.network_state.has_connected_peer());
 	}
 }
 
@@ -306,7 +287,7 @@ impl NetworkContext for NetworkContextImpl {
 		packet_id: PacketId,
 		data: Vec<u8>
 	) {
-		debug_assert!(self.inner.protocols.read().has_protocol(protocol),
+		debug_assert!(self.inner.protocols.has_protocol(protocol),
 			"invalid protocol id requested in the API of the libp2p networking");
 		// TODO: could be "optimized" by building `message` only after checking the validity of
 		// 		the peer, but that's probably not worth the effort
@@ -360,12 +341,11 @@ impl NetworkContext for NetworkContextImpl {
 	fn register_timer(&self, token: usize, duration: Duration)
 		-> Result<(), Error> {
 		let handler = self.inner.protocols
-			.read()
 			.find_protocol(self.protocol)
 			.ok_or(ErrorKind::BadProtocol)?
 			.custom_data()
 			.clone();
-		self.inner.timeouts_register_tx.read()
+		self.inner.timeouts_register_tx
 			.unbounded_send((duration, (handler, self.protocol, token)))
 			.map_err(|err| ErrorKind::Io(IoError::new(IoErrorKind::Other, err)))?;
 		Ok(())
@@ -485,7 +465,7 @@ fn init_thread(
 		match shared.network_state.add_peer(bootnode) {
 			Ok(who) => {
 				trace!(target: "sub-libp2p", "Dialing bootnode {:?}", who);
-				for proto in shared.protocols.read().0.clone().into_iter() {
+				for proto in shared.protocols.0.clone().into_iter() {
 					open_peer_custom_proto(
 						shared.clone(),
 						transport.clone(),
@@ -510,7 +490,7 @@ fn init_thread(
 
 				if let Ok(addr) = multi {
 					trace!(target: "sub-libp2p", "Missing NodeIndex for Bootnode {:}. Querying", bootnode);
-					for proto in shared.protocols.read().0.clone().into_iter() {
+					for proto in shared.protocols.0.clone().into_iter() {
 						connect_with_query_peer_id(
 							shared.clone(),
 							transport.clone(),
@@ -1025,7 +1005,7 @@ fn connect_to_nodes<T, To, St, C>(
 		// Try to dial that node for each registered protocol. Since dialing
 		// upgrades the connection to use multiplexing, dialing multiple times
 		// should automatically open multiple substreams.
-		for proto in shared.protocols.read().0.clone().into_iter() {
+		for proto in shared.protocols.0.clone().into_iter() {
 			open_peer_custom_proto(
 				shared.clone(),
 				base_transport.clone(),
@@ -1398,7 +1378,7 @@ where C: AsyncRead + AsyncWrite + 'static,		// TODO: 'static :-/
 	type UpgradeIdentifier = <RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>> as ConnectionUpgrade<C, Maf>>::UpgradeIdentifier;
 
 	fn protocol_names(&self) -> Self::NamesIter {
-		ConnectionUpgrade::<C, Maf>::protocol_names(&*self.0.protocols.read())
+		ConnectionUpgrade::<C, Maf>::protocol_names(&self.0.protocols)
 	}
 
 	type Output = <RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>> as ConnectionUpgrade<C, Maf>>::Output;
@@ -1409,7 +1389,7 @@ where C: AsyncRead + AsyncWrite + 'static,		// TODO: 'static :-/
 	fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, endpoint: Endpoint,
 		remote_addr: Maf) -> Self::Future
 	{
-		self.0.protocols.read()
+		self.0.protocols
 			.clone()
 			.upgrade(socket, id, endpoint, remote_addr)
 	}
@@ -1422,7 +1402,6 @@ mod tests {
 	#[test]
 	fn builds_and_finishes_in_finite_time() {
 		// Checks that merely starting the network doesn't end up in an infinite loop.
-		let service = NetworkService::new(Default::default(), None).unwrap();
-		service.start(vec![]).map_err(|(err, _)| err).unwrap();
+		let _service = NetworkService::new(Default::default(), vec![], None).unwrap();
 	}
 }
diff --git a/substrate/substrate/network-libp2p/tests/tests.rs b/substrate/substrate/network-libp2p/tests/tests.rs
index fc07035f9d0..60d962eda14 100644
--- a/substrate/substrate/network-libp2p/tests/tests.rs
+++ b/substrate/substrate/network-libp2p/tests/tests.rs
@@ -93,17 +93,11 @@ impl NetworkProtocolHandler for TestProtocol {
 
 #[test]
 fn net_service() {
-	let service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
-	service.start(vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])]).unwrap();
-}
-
-#[test]
-fn net_start_stop() {
-	let config = NetworkConfiguration::new_local();
-	let service = NetworkService::new(config, None).unwrap();
-	service.start(vec![]).unwrap();
-	service.stop();
-	service.start(vec![]).unwrap();
+	let _service = NetworkService::new(
+		NetworkConfiguration::new_local(),
+		vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])],
+		None
+	).expect("Error creating network service");
 }
 
 #[test]
@@ -113,14 +107,12 @@ fn net_disconnect() {
 	let mut config1 = NetworkConfiguration::new_local();
 	config1.use_secret = Some(key1.secret().clone());
 	config1.boot_nodes = vec![ ];
-	let service1 = NetworkService::new(config1, None).unwrap();
 	let handler1 = Arc::new(TestProtocol::new(false));
-	service1.start(vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
+	let service1 = NetworkService::new(config1, vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
 	let mut config2 = NetworkConfiguration::new_local();
 	config2.boot_nodes = vec![ service1.external_url().unwrap() ];
-	let service2 = NetworkService::new(config2, None).unwrap();
 	let handler2 = Arc::new(TestProtocol::new(true));
-	service2.start(vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
+	let _service2 = NetworkService::new(config2, vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
 	while !(handler1.got_disconnect() && handler2.got_disconnect()) {
 		thread::sleep(Duration::from_millis(50));
 	}
@@ -131,9 +123,8 @@ fn net_disconnect() {
 #[test]
 fn net_timeout() {
 	let config = NetworkConfiguration::new_local();
-	let service = NetworkService::new(config, None).unwrap();
 	let handler = Arc::new(TestProtocol::new(false));
-	service.start(vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
+	let _service = NetworkService::new(config, vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
 	while !handler.got_timeout() {
 		thread::sleep(Duration::from_millis(50));
 	}
diff --git a/substrate/substrate/network/src/service.rs b/substrate/substrate/network/src/service.rs
index 77114e9d079..ad1699e3fb3 100644
--- a/substrate/substrate/network/src/service.rs
+++ b/substrate/substrate/network/src/service.rs
@@ -149,21 +149,34 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
 	/// Creates and register protocol with the network service
 	pub fn new(params: Params<B, S>, protocol_id: ProtocolId) -> Result<Arc<Service<B, S>>, Error> {
 		let chain = params.chain.clone();
-		let service = NetworkService::new(params.network_config.clone(), None)?;
 		let import_queue = Arc::new(AsyncImportQueue::new());
+		let handler = Arc::new(ProtocolHandler {
+			protocol: Protocol::new(
+				params.config,
+				params.chain,
+				import_queue.clone(),
+				params.on_demand,
+				params.transaction_pool,
+				params.specialization,
+			)?,
+		});
+		let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)];
+		let protocols = vec![(handler.clone() as Arc<_>, protocol_id, &versions[..])];
+		let service = match NetworkService::new(params.network_config.clone(), protocols, None) {
+			Ok(service) => service,
+			Err(err) => {
+				match err.kind() {
+					ErrorKind::Io(ref e) if e.kind() == io::ErrorKind::AddrInUse =>
+						warn!("Network port is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option."),
+					_ => warn!("Error starting network: {}", err),
+				};
+				return Err(err.into())
+			},
+		};
 		let sync = Arc::new(Service {
 			network: service,
 			protocol_id,
-			handler: Arc::new(ProtocolHandler {
-				protocol: Protocol::new(
-					params.config,
-					params.chain,
-					import_queue.clone(),
-					params.on_demand,
-					params.transaction_pool,
-					params.specialization,
-				)?,
-			}),
+			handler,
 		});
 
 		import_queue.start(
@@ -201,27 +214,11 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
 
 		res
 	}
-
-	fn start(&self) {
-		let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)];
-		let protocols = vec![(self.handler.clone() as Arc<_>, self.protocol_id, &versions[..])];
-		match self.network.start(protocols).map_err(|e| e.0.into()) {
-			Err(ErrorKind::Io(ref e)) if  e.kind() == io::ErrorKind::AddrInUse =>
-				warn!("Network port is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option."),
-			Err(err) => warn!("Error starting network: {}", err),
-			_ => {},
-		};
-	}
-
-	fn stop(&self) {
-		self.handler.protocol.stop();
-		self.network.stop();
-	}
 }
 
 impl<B: BlockT + 'static, S: Specialization<B>> Drop for Service<B, S> {
 	fn drop(&mut self) {
-		self.stop();
+		self.handler.protocol.stop();
 	}
 }
 
@@ -307,10 +304,6 @@ pub trait ManageNetwork: Send + Sync {
 	fn remove_reserved_peer(&self, peer: String) -> Result<(), String>;
 	/// Add reserved peer
 	fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
-	/// Start network
-	fn start_network(&self);
-	/// Stop network
-	fn stop_network(&self);
 }
 
 
@@ -330,12 +323,4 @@ impl<B: BlockT + 'static, S: Specialization<B>> ManageNetwork for Service<B, S>
 	fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
 		self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
 	}
-
-	fn start_network(&self) {
-		self.start();
-	}
-
-	fn stop_network(&self) {
-		self.stop();
-	}
 }
diff --git a/substrate/substrate/service/src/lib.rs b/substrate/substrate/service/src/lib.rs
index 7ea3a1caf5a..53fba5bcc09 100644
--- a/substrate/substrate/service/src/lib.rs
+++ b/substrate/substrate/service/src/lib.rs
@@ -60,7 +60,6 @@ use std::sync::Arc;
 use futures::prelude::*;
 use keystore::Store as Keystore;
 use client::BlockchainEvents;
-use network::ManageNetwork;
 use runtime_primitives::traits::{Header, As};
 use exit_future::Signal;
 use tokio::runtime::TaskExecutor;
@@ -83,7 +82,7 @@ pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend,
 /// Substrate service.
 pub struct Service<Components: components::Components> {
 	client: Arc<ComponentClient<Components>>,
-	network: Arc<components::NetworkService<Components::Factory>>,
+	network: Option<Arc<components::NetworkService<Components::Factory>>>,
 	extrinsic_pool: Arc<Components::ExtrinsicPool>,
 	keystore: Keystore,
 	exit: ::exit_future::Exit,
@@ -159,8 +158,6 @@ impl<Components> Service<Components>
 		let network = network::Service::new(network_params, Components::Factory::NETWORK_PROTOCOL_ID)?;
 		on_demand.map(|on_demand| on_demand.set_service_link(Arc::downgrade(&network)));
 
-		network.start_network();
-
 		{
 			// block notifications
 			let network = network.clone();
@@ -243,7 +240,7 @@ impl<Components> Service<Components>
 
 		Ok(Service {
 			client: client,
-			network: network,
+			network: Some(network),
 			extrinsic_pool: extrinsic_pool,
 			signal: Some(signal),
 			keystore: keystore,
@@ -261,7 +258,7 @@ impl<Components> Service<Components>
 
 	/// Get shared network instance.
 	pub fn network(&self) -> Arc<components::NetworkService<Components::Factory>> {
-		self.network.clone()
+		self.network.as_ref().expect("self.network always Some").clone()
 	}
 
 	/// Get shared extrinsic pool instance.
@@ -284,7 +281,7 @@ impl<Components> Drop for Service<Components> where Components: components::Comp
 	fn drop(&mut self) {
 		debug!(target: "service", "Substrate service shutdown");
 
-		self.network.stop_network();
+		drop(self.network.take());
 
 		if let Some(signal) = self.signal.take() {
 			signal.fire();
-- 
GitLab