From 5282615416dd7d6fdc297143b1254164ecb95c1b Mon Sep 17 00:00:00 2001
From: Gregory Terzian <2792687+gterzian@users.noreply.github.com>
Date: Wed, 27 Feb 2019 16:19:57 +0800
Subject: [PATCH] Use bounded channels for network -> sync -> import (#1874)

* use bounded channels for network -> sync -> import

* bound at 4

* indent

* use return value of handle_network_msg
---
 .../core/consensus/common/src/import_queue.rs |   4 +-
 substrate/core/network/src/protocol.rs        | 111 ++++++++++++------
 substrate/core/network/src/service.rs         |  24 ++--
 substrate/core/network/src/test/mod.rs        |  18 +--
 4 files changed, 98 insertions(+), 59 deletions(-)

diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs
index cb09f57caf3..89f3a563b5e 100644
--- a/substrate/core/consensus/common/src/import_queue.rs
+++ b/substrate/core/consensus/common/src/import_queue.rs
@@ -263,7 +263,7 @@ impl<B: BlockT> BlockImporter<B> {
 		worker_sender: Sender<BlockImportWorkerMsg<B>>,
 		justification_import: Option<SharedJustificationImport<B>>,
 	) -> Sender<BlockImportMsg<B>> {
-		let (sender, port) = channel::unbounded();
+		let (sender, port) = channel::bounded(4);
 		let _ = thread::Builder::new()
 			.name("ImportQueue".into())
 			.spawn(move || {
@@ -454,7 +454,7 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
 		verifier: Arc<V>,
 		block_import: SharedBlockImport<B>,
 	) -> Sender<BlockImportWorkerMsg<B>> {
-		let (sender, port) = channel::unbounded();
+		let (sender, port) = channel::bounded(4);
 		let _ = thread::Builder::new()
 			.name("ImportQueueWorker".into())
 			.spawn(move || {
diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs
index a85b2e0cec8..82e763bdd39 100644
--- a/substrate/core/network/src/protocol.rs
+++ b/substrate/core/network/src/protocol.rs
@@ -58,6 +58,7 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;
 pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
 	network_chan: NetworkChan<B>,
 	port: Receiver<ProtocolMsg<B, S>>,
+	from_network_port: Receiver<FromNetworkMsg<B>>,
 	config: ProtocolConfig,
 	on_demand: Option<Arc<OnDemandService<B>>>,
 	genesis_hash: B::Hash,
@@ -69,6 +70,7 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
 	handshaking_peers: HashMap<NodeIndex, time::Instant>,
 	transaction_pool: Arc<TransactionPool<H, B>>,
 }
+
 /// Syncing status and statistics
 #[derive(Clone)]
 pub struct ProtocolStatus<B: BlockT> {
@@ -183,44 +185,36 @@ impl<B: BlockT, F: FnOnce(&mut ConsensusGossip<B>, &mut Context<B>)> GossipTask<
     }
 }
 
-/// Messages sent to Protocol.
-pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
-	/// A peer connected, with debug info.
-	PeerConnected(NodeIndex, String),
-	/// A peer disconnected, with debug info.
-	PeerDisconnected(NodeIndex, String),
-	/// A custom message from another peer.
-	CustomMessage(NodeIndex, Message<B>),
-	/// Ask the protocol for its status.
-	Status(Sender<ProtocolStatus<B>>),
-	/// Tell protocol to propagate extrinsics.
-	PropagateExtrinsics,
-	/// Execute a closure with the chain-specific network specialization.
-	ExecuteWithSpec(Box<SpecTask<B, S> + Send + 'static>),
-	/// Execute a closure with the consensus gossip.
-	ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
-	/// Incoming gossip consensus message.
-	GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>),
-	/// Return a list of peers currently known to protocol.
-	Peers(Sender<Vec<(NodeIndex, PeerInfo<B>)>>),
-	/// Let protocol know a peer is currenlty clogged.
-	PeerClogged(NodeIndex, Option<Message<B>>),
+/// Messages sent to Protocol from elsewhere inside the system.
+pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
 	/// Tell protocol to maintain sync.
 	MaintainSync,
 	/// Tell protocol to restart sync.
 	RestartSync,
-	/// Propagate a block to peers.
-	AnnounceBlock(B::Hash),
+	/// Ask the protocol for its status.
+	Status(Sender<ProtocolStatus<B>>),
+	/// Tell protocol to propagate extrinsics.
+	PropagateExtrinsics,
 	/// Tell protocol that a block was imported (sent by the import-queue).
 	BlockImportedSync(B::Hash, NumberFor<B>),
 	/// Tell protocol to request justification for a block.
 	RequestJustification(B::Hash, NumberFor<B>),
 	/// Inform protocol whether a justification was successfully imported.
 	JustificationImportResult(B::Hash, NumberFor<B>, bool),
+	/// Propagate a block to peers.
+	AnnounceBlock(B::Hash),
 	/// A block has been imported (sent by the client).
 	BlockImported(B::Hash, B::Header),
 	/// A block has been finalized (sent by the client).
 	BlockFinalized(B::Hash, B::Header),
+	/// Execute a closure with the chain-specific network specialization.
+	ExecuteWithSpec(Box<SpecTask<B, S> + Send + 'static>),
+	/// Execute a closure with the consensus gossip.
+	ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
+	/// Incoming gossip consensus message.
+	GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>),
+	/// Return a list of peers currently known to protocol.
+	Peers(Sender<Vec<(NodeIndex, PeerInfo<B>)>>),
 	/// Tell protocol to abort sync (does not stop protocol).
 	/// Only used in tests.
 	#[cfg(any(test, feature = "test-helpers"))]
@@ -231,6 +225,23 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> {
 	Tick,
 }
 
+/// Messages sent to Protocol from Network-libp2p.
+pub enum FromNetworkMsg<B: BlockT> {
+	/// A peer connected, with debug info.
+	PeerConnected(NodeIndex, String),
+	/// A peer disconnected, with debug info.
+	PeerDisconnected(NodeIndex, String),
+	/// A custom message from another peer.
+	CustomMessage(NodeIndex, Message<B>),
+	/// Let protocol know a peer is currenlty clogged.
+	PeerClogged(NodeIndex, Option<Message<B>>),
+}
+
+enum Incoming<B: BlockT, S: NetworkSpecialization<B>> {
+	FromNetwork(FromNetworkMsg<B>),
+	FromClient(ProtocolMsg<B, S>)
+}
+
 impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 	/// Create a new instance.
 	pub fn new(
@@ -243,8 +254,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 		on_demand: Option<Arc<OnDemandService<B>>>,
 		transaction_pool: Arc<TransactionPool<H, B>>,
 		specialization: S,
-	) -> error::Result<Sender<ProtocolMsg<B, S>>> {
-		let (sender, port) = channel::unbounded();
+	) -> error::Result<(Sender<ProtocolMsg<B, S>>, Sender<FromNetworkMsg<B>>)> {
+		let (protocol_sender, port) = channel::unbounded();
+		let (from_network_sender, from_network_port) = channel::bounded(4);
 		let info = chain.info()?;
 		let sync = ChainSync::new(is_offline, is_major_syncing, config.roles, &info, import_queue);
 		let _ = thread::Builder::new()
@@ -252,6 +264,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 			.spawn(move || {
 				let mut protocol = Protocol {
 					network_chan,
+					from_network_port,
 					port,
 					config: config,
 					context_data: ContextData {
@@ -273,7 +286,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 				}
 			})
 			.expect("Protocol thread spawning failed");
-		Ok(sender)
+		Ok((protocol_sender, from_network_sender))
 	}
 
 	fn run(
@@ -284,35 +297,45 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 		let msg = select! {
 			recv(self.port) -> event => {
 				match event {
-					Ok(msg) => msg,
+					Ok(msg) => Incoming::FromClient(msg),
+					// Our sender has been dropped, quit.
+					Err(_) => {
+						Incoming::FromClient(ProtocolMsg::Stop)
+					},
+				}
+			},
+			recv(self.from_network_port) -> event => {
+				match event {
+					Ok(msg) => Incoming::FromNetwork(msg),
 					// Our sender has been dropped, quit.
 					Err(_) => {
-						ProtocolMsg::Stop
+						Incoming::FromClient(ProtocolMsg::Stop)
 					},
 				}
 			},
 			recv(tick_timeout) -> _ => {
-				ProtocolMsg::Tick
+				Incoming::FromClient(ProtocolMsg::Tick)
 			},
 			recv(propagate_timeout) -> _ => {
-				ProtocolMsg::PropagateExtrinsics
+				Incoming::FromClient(ProtocolMsg::PropagateExtrinsics)
 			},
 		};
 		self.handle_msg(msg)
 	}
 
-	fn handle_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool {
+	fn handle_msg(&mut self, msg: Incoming<B, S>) -> bool {
+		match msg {
+			Incoming::FromNetwork(msg) => self.handle_network_msg(msg),
+			Incoming::FromClient(msg) => self.handle_client_msg(msg),
+		}
+	}
+
+	fn handle_client_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool {
 		match msg {
 			ProtocolMsg::Peers(sender) => {
 				let peers = self.context_data.peers.iter().map(|(idx, p)| (*idx, p.info.clone())).collect();
 				let _ = sender.send(peers);
 			},
-			ProtocolMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info),
-			ProtocolMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info),
-			ProtocolMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message),
-			ProtocolMsg::CustomMessage(who, message) => {
-				self.on_custom_message(who, message)
-			},
 			ProtocolMsg::Status(sender) => self.status(sender),
 			ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header),
 			ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header),
@@ -359,6 +382,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 		true
 	}
 
+	fn handle_network_msg(&mut self, msg: FromNetworkMsg<B>) -> bool {
+		match msg {
+			FromNetworkMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info),
+			FromNetworkMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info),
+			FromNetworkMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message),
+			FromNetworkMsg::CustomMessage(who, message) => {
+				self.on_custom_message(who, message)
+			},
+		}
+		true
+	}
+
 	fn handle_response(&mut self, who: NodeIndex, response: &message::BlockResponse<B>) -> Option<message::BlockRequest<B>> {
 		if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
 			if let Some(_) = peer.obsolete_requests.remove(&response.id) {
diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs
index cc5f6ffeea8..bcdc636d4cf 100644
--- a/substrate/core/network/src/service.rs
+++ b/substrate/core/network/src/service.rs
@@ -27,7 +27,7 @@ use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol};
 use consensus::import_queue::{ImportQueue, Link};
 use crate::consensus_gossip::ConsensusGossip;
 use crate::message::{Message, ConsensusEngineId};
-use crate::protocol::{self, Context, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo};
+use crate::protocol::{self, Context, FromNetworkMsg, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo};
 use crate::config::Params;
 use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError};
 use crate::error::Error;
@@ -143,7 +143,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
 		// Start in off-line mode, since we're not connected to any nodes yet.
 		let is_offline = Arc::new(AtomicBool::new(true));
 		let is_major_syncing = Arc::new(AtomicBool::new(false));
-		let protocol_sender = Protocol::new(
+		let (protocol_sender, network_to_protocol_sender) = Protocol::new(
 			is_offline.clone(),
 			is_major_syncing.clone(),
 			network_chan.clone(),
@@ -157,7 +157,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
 		let versions = [(protocol::CURRENT_VERSION as u8)];
 		let registered = RegisteredProtocol::new(protocol_id, &versions[..]);
 		let (thread, network) = start_thread(
-			protocol_sender.clone(),
+			network_to_protocol_sender,
 			network_port,
 			params.network_config,
 			registered,
@@ -435,8 +435,8 @@ pub enum NetworkMsg<B: BlockT + 'static> {
 }
 
 /// Starts the background thread that handles the networking.
-fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
-	protocol_sender: Sender<ProtocolMsg<B, S>>,
+fn start_thread<B: BlockT + 'static>(
+	protocol_sender: Sender<FromNetworkMsg<B>>,
 	network_port: NetworkPort<B>,
 	config: NetworkConfiguration,
 	registered: RegisteredProtocol<Message<B>>,
@@ -477,8 +477,8 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
 }
 
 /// Runs the background thread that handles the networking.
-fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
-	protocol_sender: Sender<ProtocolMsg<B, S>>,
+fn run_thread<B: BlockT + 'static>(
+	protocol_sender: Sender<FromNetworkMsg<B>>,
 	network_service: Arc<Mutex<NetworkService<Message<B>>>>,
 	network_port: NetworkPort<B>,
 	protocol_id: ProtocolId,
@@ -543,25 +543,25 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>(
 				if !protocols.is_empty() {
 					debug_assert_eq!(protocols, &[protocol_id]);
 					let _ = protocol_sender.send(
-						ProtocolMsg::PeerDisconnected(node_index, debug_info));
+						FromNetworkMsg::PeerDisconnected(node_index, debug_info));
 				}
 			}
 			NetworkServiceEvent::OpenedCustomProtocol { node_index, version, debug_info, .. } => {
 				debug_assert_eq!(version, protocol::CURRENT_VERSION as u8);
-				let _ = protocol_sender.send(ProtocolMsg::PeerConnected(node_index, debug_info));
+				let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(node_index, debug_info));
 			}
 			NetworkServiceEvent::ClosedCustomProtocol { node_index, debug_info, .. } => {
-				let _ = protocol_sender.send(ProtocolMsg::PeerDisconnected(node_index, debug_info));
+				let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(node_index, debug_info));
 			}
 			NetworkServiceEvent::CustomMessage { node_index, message, .. } => {
-				let _ = protocol_sender.send(ProtocolMsg::CustomMessage(node_index, message));
+				let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(node_index, message));
 				return Ok(())
 			}
 			NetworkServiceEvent::Clogged { node_index, messages, .. } => {
 				debug!(target: "sync", "{} clogging messages:", messages.len());
 				for msg in messages.into_iter().take(5) {
 					debug!(target: "sync", "{:?}", msg);
-					let _ = protocol_sender.send(ProtocolMsg::PeerClogged(node_index, Some(msg)));
+					let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(node_index, Some(msg)));
 				}
 			}
 		};
diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs
index 7245d1f1ab6..ae51b9b0da6 100644
--- a/substrate/core/network/src/test/mod.rs
+++ b/substrate/core/network/src/test/mod.rs
@@ -45,7 +45,7 @@ use network_libp2p::{NodeIndex, ProtocolId};
 use parity_codec::Encode;
 use parking_lot::Mutex;
 use primitives::{H256, Ed25519AuthorityId};
-use crate::protocol::{Context, Protocol, ProtocolMsg};
+use crate::protocol::{Context, FromNetworkMsg, Protocol, ProtocolMsg};
 use runtime_primitives::generic::BlockId;
 use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor};
 use runtime_primitives::Justification;
@@ -121,6 +121,7 @@ pub struct Peer<D> {
 	pub is_offline: Arc<AtomicBool>,
 	pub is_major_syncing: Arc<AtomicBool>,
 	client: Arc<PeersClient>,
+	network_to_protocol_sender: Sender<FromNetworkMsg<Block>>,
 	pub protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
 
 	network_port: Mutex<NetworkPort<Block>>,
@@ -137,6 +138,7 @@ impl<D> Peer<D> {
 		is_major_syncing: Arc<AtomicBool>,
 		client: Arc<PeersClient>,
 		import_queue: Box<ImportQueue<Block>>,
+		network_to_protocol_sender: Sender<FromNetworkMsg<Block>>,
 		protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>,
 		network_sender: NetworkChan<Block>,
 		network_port: NetworkPort<Block>,
@@ -147,6 +149,7 @@ impl<D> Peer<D> {
 			is_offline,
 			is_major_syncing,
 			client,
+			network_to_protocol_sender,
 			protocol_sender,
 			import_queue,
 			network_sender,
@@ -198,21 +201,21 @@ impl<D> Peer<D> {
 
 	/// Called on connection to other indicated peer.
 	fn on_connect(&self, other: NodeIndex) {
-		let _ = self.protocol_sender.send(ProtocolMsg::PeerConnected(other, String::new()));
+		let _ = self.network_to_protocol_sender.send(FromNetworkMsg::PeerConnected(other, String::new()));
 	}
 
 	/// Called on disconnect from other indicated peer.
 	fn on_disconnect(&self, other: NodeIndex) {
 		let _ = self
-			.protocol_sender
-			.send(ProtocolMsg::PeerDisconnected(other, String::new()));
+			.network_to_protocol_sender
+			.send(FromNetworkMsg::PeerDisconnected(other, String::new()));
 	}
 
 	/// Receive a message from another peer. Return a set of peers to disconnect.
 	fn receive_message(&self, from: NodeIndex, msg: Message<Block>) {
 		let _ = self
-			.protocol_sender
-			.send(ProtocolMsg::CustomMessage(from, msg));
+			.network_to_protocol_sender
+			.send(FromNetworkMsg::CustomMessage(from, msg));
 	}
 
 	/// Produce the next pending message to send to another peer.
@@ -480,7 +483,7 @@ pub trait TestNetFactory: Sized {
 		let specialization = DummySpecialization {};
 		let is_offline = Arc::new(AtomicBool::new(true));
 		let is_major_syncing = Arc::new(AtomicBool::new(false));
-		let protocol_sender = Protocol::new(
+		let (protocol_sender, network_to_protocol_sender) = Protocol::new(
 			is_offline.clone(),
 			is_major_syncing.clone(),
 			network_sender.clone(),
@@ -497,6 +500,7 @@ pub trait TestNetFactory: Sized {
 			is_major_syncing,
 			client,
 			import_queue,
+			network_to_protocol_sender,
 			protocol_sender,
 			network_sender,
 			network_port,
-- 
GitLab