diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index cb09f57caf39bda973fed5b73fc88549764ad3d8..89f3a563b5e807562af20ff5df6aa02d6ebb01d8 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -263,7 +263,7 @@ impl<B: BlockT> BlockImporter<B> { worker_sender: Sender<BlockImportWorkerMsg<B>>, justification_import: Option<SharedJustificationImport<B>>, ) -> Sender<BlockImportMsg<B>> { - let (sender, port) = channel::unbounded(); + let (sender, port) = channel::bounded(4); let _ = thread::Builder::new() .name("ImportQueue".into()) .spawn(move || { @@ -454,7 +454,7 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> { verifier: Arc<V>, block_import: SharedBlockImport<B>, ) -> Sender<BlockImportWorkerMsg<B>> { - let (sender, port) = channel::unbounded(); + let (sender, port) = channel::bounded(4); let _ = thread::Builder::new() .name("ImportQueueWorker".into()) .spawn(move || { diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index a85b2e0cec80b779cb223d150699d004d21ab53d..82e763bdd39c20111315a6b0c7ea4a00754f26da 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -58,6 +58,7 @@ const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192; pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> { network_chan: NetworkChan<B>, port: Receiver<ProtocolMsg<B, S>>, + from_network_port: Receiver<FromNetworkMsg<B>>, config: ProtocolConfig, on_demand: Option<Arc<OnDemandService<B>>>, genesis_hash: B::Hash, @@ -69,6 +70,7 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> { handshaking_peers: HashMap<NodeIndex, time::Instant>, transaction_pool: Arc<TransactionPool<H, B>>, } + /// Syncing status and statistics #[derive(Clone)] pub struct ProtocolStatus<B: BlockT> { @@ -183,44 +185,36 @@ impl<B: BlockT, F: FnOnce(&mut ConsensusGossip<B>, &mut Context<B>)> GossipTask< } } -/// Messages sent to Protocol. -pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> { - /// A peer connected, with debug info. - PeerConnected(NodeIndex, String), - /// A peer disconnected, with debug info. - PeerDisconnected(NodeIndex, String), - /// A custom message from another peer. - CustomMessage(NodeIndex, Message<B>), - /// Ask the protocol for its status. - Status(Sender<ProtocolStatus<B>>), - /// Tell protocol to propagate extrinsics. - PropagateExtrinsics, - /// Execute a closure with the chain-specific network specialization. - ExecuteWithSpec(Box<SpecTask<B, S> + Send + 'static>), - /// Execute a closure with the consensus gossip. - ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>), - /// Incoming gossip consensus message. - GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>), - /// Return a list of peers currently known to protocol. - Peers(Sender<Vec<(NodeIndex, PeerInfo<B>)>>), - /// Let protocol know a peer is currenlty clogged. - PeerClogged(NodeIndex, Option<Message<B>>), +/// Messages sent to Protocol from elsewhere inside the system. +pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> { /// Tell protocol to maintain sync. MaintainSync, /// Tell protocol to restart sync. RestartSync, - /// Propagate a block to peers. - AnnounceBlock(B::Hash), + /// Ask the protocol for its status. + Status(Sender<ProtocolStatus<B>>), + /// Tell protocol to propagate extrinsics. + PropagateExtrinsics, /// Tell protocol that a block was imported (sent by the import-queue). BlockImportedSync(B::Hash, NumberFor<B>), /// Tell protocol to request justification for a block. RequestJustification(B::Hash, NumberFor<B>), /// Inform protocol whether a justification was successfully imported. JustificationImportResult(B::Hash, NumberFor<B>, bool), + /// Propagate a block to peers. + AnnounceBlock(B::Hash), /// A block has been imported (sent by the client). BlockImported(B::Hash, B::Header), /// A block has been finalized (sent by the client). BlockFinalized(B::Hash, B::Header), + /// Execute a closure with the chain-specific network specialization. + ExecuteWithSpec(Box<SpecTask<B, S> + Send + 'static>), + /// Execute a closure with the consensus gossip. + ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>), + /// Incoming gossip consensus message. + GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>), + /// Return a list of peers currently known to protocol. + Peers(Sender<Vec<(NodeIndex, PeerInfo<B>)>>), /// Tell protocol to abort sync (does not stop protocol). /// Only used in tests. #[cfg(any(test, feature = "test-helpers"))] @@ -231,6 +225,23 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> { Tick, } +/// Messages sent to Protocol from Network-libp2p. +pub enum FromNetworkMsg<B: BlockT> { + /// A peer connected, with debug info. + PeerConnected(NodeIndex, String), + /// A peer disconnected, with debug info. + PeerDisconnected(NodeIndex, String), + /// A custom message from another peer. + CustomMessage(NodeIndex, Message<B>), + /// Let protocol know a peer is currenlty clogged. + PeerClogged(NodeIndex, Option<Message<B>>), +} + +enum Incoming<B: BlockT, S: NetworkSpecialization<B>> { + FromNetwork(FromNetworkMsg<B>), + FromClient(ProtocolMsg<B, S>) +} + impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// Create a new instance. pub fn new( @@ -243,8 +254,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { on_demand: Option<Arc<OnDemandService<B>>>, transaction_pool: Arc<TransactionPool<H, B>>, specialization: S, - ) -> error::Result<Sender<ProtocolMsg<B, S>>> { - let (sender, port) = channel::unbounded(); + ) -> error::Result<(Sender<ProtocolMsg<B, S>>, Sender<FromNetworkMsg<B>>)> { + let (protocol_sender, port) = channel::unbounded(); + let (from_network_sender, from_network_port) = channel::bounded(4); let info = chain.info()?; let sync = ChainSync::new(is_offline, is_major_syncing, config.roles, &info, import_queue); let _ = thread::Builder::new() @@ -252,6 +264,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { .spawn(move || { let mut protocol = Protocol { network_chan, + from_network_port, port, config: config, context_data: ContextData { @@ -273,7 +286,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } }) .expect("Protocol thread spawning failed"); - Ok(sender) + Ok((protocol_sender, from_network_sender)) } fn run( @@ -284,35 +297,45 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { let msg = select! { recv(self.port) -> event => { match event { - Ok(msg) => msg, + Ok(msg) => Incoming::FromClient(msg), + // Our sender has been dropped, quit. + Err(_) => { + Incoming::FromClient(ProtocolMsg::Stop) + }, + } + }, + recv(self.from_network_port) -> event => { + match event { + Ok(msg) => Incoming::FromNetwork(msg), // Our sender has been dropped, quit. Err(_) => { - ProtocolMsg::Stop + Incoming::FromClient(ProtocolMsg::Stop) }, } }, recv(tick_timeout) -> _ => { - ProtocolMsg::Tick + Incoming::FromClient(ProtocolMsg::Tick) }, recv(propagate_timeout) -> _ => { - ProtocolMsg::PropagateExtrinsics + Incoming::FromClient(ProtocolMsg::PropagateExtrinsics) }, }; self.handle_msg(msg) } - fn handle_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool { + fn handle_msg(&mut self, msg: Incoming<B, S>) -> bool { + match msg { + Incoming::FromNetwork(msg) => self.handle_network_msg(msg), + Incoming::FromClient(msg) => self.handle_client_msg(msg), + } + } + + fn handle_client_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool { match msg { ProtocolMsg::Peers(sender) => { let peers = self.context_data.peers.iter().map(|(idx, p)| (*idx, p.info.clone())).collect(); let _ = sender.send(peers); }, - ProtocolMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info), - ProtocolMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info), - ProtocolMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message), - ProtocolMsg::CustomMessage(who, message) => { - self.on_custom_message(who, message) - }, ProtocolMsg::Status(sender) => self.status(sender), ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header), ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header), @@ -359,6 +382,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { true } + fn handle_network_msg(&mut self, msg: FromNetworkMsg<B>) -> bool { + match msg { + FromNetworkMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info), + FromNetworkMsg::PeerConnected(who, debug_info) => self.on_peer_connected(who, debug_info), + FromNetworkMsg::PeerClogged(who, message) => self.on_clogged_peer(who, message), + FromNetworkMsg::CustomMessage(who, message) => { + self.on_custom_message(who, message) + }, + } + true + } + fn handle_response(&mut self, who: NodeIndex, response: &message::BlockResponse<B>) -> Option<message::BlockRequest<B>> { if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { if let Some(_) = peer.obsolete_requests.remove(&response.id) { diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index cc5f6ffeea80fdf0c4ff8ff90f1a5cd9392358f0..bcdc636d4cfa795b2b912c19ffa4b958c6b200d7 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -27,7 +27,7 @@ use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol}; use consensus::import_queue::{ImportQueue, Link}; use crate::consensus_gossip::ConsensusGossip; use crate::message::{Message, ConsensusEngineId}; -use crate::protocol::{self, Context, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo}; +use crate::protocol::{self, Context, FromNetworkMsg, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo}; use crate::config::Params; use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError}; use crate::error::Error; @@ -143,7 +143,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> { // Start in off-line mode, since we're not connected to any nodes yet. let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); - let protocol_sender = Protocol::new( + let (protocol_sender, network_to_protocol_sender) = Protocol::new( is_offline.clone(), is_major_syncing.clone(), network_chan.clone(), @@ -157,7 +157,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> { let versions = [(protocol::CURRENT_VERSION as u8)]; let registered = RegisteredProtocol::new(protocol_id, &versions[..]); let (thread, network) = start_thread( - protocol_sender.clone(), + network_to_protocol_sender, network_port, params.network_config, registered, @@ -435,8 +435,8 @@ pub enum NetworkMsg<B: BlockT + 'static> { } /// Starts the background thread that handles the networking. -fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>( - protocol_sender: Sender<ProtocolMsg<B, S>>, +fn start_thread<B: BlockT + 'static>( + protocol_sender: Sender<FromNetworkMsg<B>>, network_port: NetworkPort<B>, config: NetworkConfiguration, registered: RegisteredProtocol<Message<B>>, @@ -477,8 +477,8 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>( } /// Runs the background thread that handles the networking. -fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>( - protocol_sender: Sender<ProtocolMsg<B, S>>, +fn run_thread<B: BlockT + 'static>( + protocol_sender: Sender<FromNetworkMsg<B>>, network_service: Arc<Mutex<NetworkService<Message<B>>>>, network_port: NetworkPort<B>, protocol_id: ProtocolId, @@ -543,25 +543,25 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>>( if !protocols.is_empty() { debug_assert_eq!(protocols, &[protocol_id]); let _ = protocol_sender.send( - ProtocolMsg::PeerDisconnected(node_index, debug_info)); + FromNetworkMsg::PeerDisconnected(node_index, debug_info)); } } NetworkServiceEvent::OpenedCustomProtocol { node_index, version, debug_info, .. } => { debug_assert_eq!(version, protocol::CURRENT_VERSION as u8); - let _ = protocol_sender.send(ProtocolMsg::PeerConnected(node_index, debug_info)); + let _ = protocol_sender.send(FromNetworkMsg::PeerConnected(node_index, debug_info)); } NetworkServiceEvent::ClosedCustomProtocol { node_index, debug_info, .. } => { - let _ = protocol_sender.send(ProtocolMsg::PeerDisconnected(node_index, debug_info)); + let _ = protocol_sender.send(FromNetworkMsg::PeerDisconnected(node_index, debug_info)); } NetworkServiceEvent::CustomMessage { node_index, message, .. } => { - let _ = protocol_sender.send(ProtocolMsg::CustomMessage(node_index, message)); + let _ = protocol_sender.send(FromNetworkMsg::CustomMessage(node_index, message)); return Ok(()) } NetworkServiceEvent::Clogged { node_index, messages, .. } => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { debug!(target: "sync", "{:?}", msg); - let _ = protocol_sender.send(ProtocolMsg::PeerClogged(node_index, Some(msg))); + let _ = protocol_sender.send(FromNetworkMsg::PeerClogged(node_index, Some(msg))); } } }; diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 7245d1f1ab6c4865c80d67ae1f9f1690ecedf5f2..ae51b9b0da67cfbf2e9082ee4d0c1dae1e5bfb33 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -45,7 +45,7 @@ use network_libp2p::{NodeIndex, ProtocolId}; use parity_codec::Encode; use parking_lot::Mutex; use primitives::{H256, Ed25519AuthorityId}; -use crate::protocol::{Context, Protocol, ProtocolMsg}; +use crate::protocol::{Context, FromNetworkMsg, Protocol, ProtocolMsg}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor}; use runtime_primitives::Justification; @@ -121,6 +121,7 @@ pub struct Peer<D> { pub is_offline: Arc<AtomicBool>, pub is_major_syncing: Arc<AtomicBool>, client: Arc<PeersClient>, + network_to_protocol_sender: Sender<FromNetworkMsg<Block>>, pub protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>, network_port: Mutex<NetworkPort<Block>>, @@ -137,6 +138,7 @@ impl<D> Peer<D> { is_major_syncing: Arc<AtomicBool>, client: Arc<PeersClient>, import_queue: Box<ImportQueue<Block>>, + network_to_protocol_sender: Sender<FromNetworkMsg<Block>>, protocol_sender: Sender<ProtocolMsg<Block, DummySpecialization>>, network_sender: NetworkChan<Block>, network_port: NetworkPort<Block>, @@ -147,6 +149,7 @@ impl<D> Peer<D> { is_offline, is_major_syncing, client, + network_to_protocol_sender, protocol_sender, import_queue, network_sender, @@ -198,21 +201,21 @@ impl<D> Peer<D> { /// Called on connection to other indicated peer. fn on_connect(&self, other: NodeIndex) { - let _ = self.protocol_sender.send(ProtocolMsg::PeerConnected(other, String::new())); + let _ = self.network_to_protocol_sender.send(FromNetworkMsg::PeerConnected(other, String::new())); } /// Called on disconnect from other indicated peer. fn on_disconnect(&self, other: NodeIndex) { let _ = self - .protocol_sender - .send(ProtocolMsg::PeerDisconnected(other, String::new())); + .network_to_protocol_sender + .send(FromNetworkMsg::PeerDisconnected(other, String::new())); } /// Receive a message from another peer. Return a set of peers to disconnect. fn receive_message(&self, from: NodeIndex, msg: Message<Block>) { let _ = self - .protocol_sender - .send(ProtocolMsg::CustomMessage(from, msg)); + .network_to_protocol_sender + .send(FromNetworkMsg::CustomMessage(from, msg)); } /// Produce the next pending message to send to another peer. @@ -480,7 +483,7 @@ pub trait TestNetFactory: Sized { let specialization = DummySpecialization {}; let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); - let protocol_sender = Protocol::new( + let (protocol_sender, network_to_protocol_sender) = Protocol::new( is_offline.clone(), is_major_syncing.clone(), network_sender.clone(), @@ -497,6 +500,7 @@ pub trait TestNetFactory: Sized { is_major_syncing, client, import_queue, + network_to_protocol_sender, protocol_sender, network_sender, network_port,