From 062b734571239d38439e5c9c412e22803735b6bc Mon Sep 17 00:00:00 2001 From: Pierre Krieger <pierre.krieger1708@gmail.com> Date: Tue, 21 May 2019 14:07:01 +0200 Subject: [PATCH] Remove the NetworkChan (#2577) * Remove the NetworkChan from the API * Remove the NetworkChan altogether * Address review * Fix line widths * More line width fixes * Remove pub visibility from entire world * Fix tests --- substrate/Cargo.lock | 1 - substrate/core/consensus/aura/src/lib.rs | 4 + substrate/core/consensus/babe/src/lib.rs | 4 + substrate/core/finality-grandpa/src/tests.rs | 4 + substrate/core/network/Cargo.toml | 1 - substrate/core/network/src/protocol.rs | 265 +++++++++++-------- substrate/core/network/src/service.rs | 181 +++++-------- substrate/core/network/src/test/mod.rs | 197 +++++++++----- 8 files changed, 360 insertions(+), 297 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index c9149bd9d72..106be37cdc8 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4271,7 +4271,6 @@ name = "substrate-network" version = "2.0.0" dependencies = [ "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "fork-tree 2.0.0", diff --git a/substrate/core/consensus/aura/src/lib.rs b/substrate/core/consensus/aura/src/lib.rs index 2c545cec33f..429666585d4 100644 --- a/substrate/core/consensus/aura/src/lib.rs +++ b/substrate/core/consensus/aura/src/lib.rs @@ -964,6 +964,10 @@ mod tests { } } + fn uses_tokio(&self) -> bool { + true + } + fn peer(&self, i: usize) -> &Peer<Self::PeerData, DummySpecialization> { &self.peers[i] } diff --git a/substrate/core/consensus/babe/src/lib.rs b/substrate/core/consensus/babe/src/lib.rs index c621d510a4c..3c720db6279 100644 --- a/substrate/core/consensus/babe/src/lib.rs +++ b/substrate/core/consensus/babe/src/lib.rs @@ -973,6 +973,10 @@ mod tests { }) } + fn uses_tokio(&self) -> bool { + true + } + fn peer(&self, i: usize) -> &Peer<Self::PeerData, DummySpecialization> { trace!(target: "babe", "Retreiving a peer"); &self.peers[i] diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 98e4b6d85c0..622eb7d4700 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -159,6 +159,10 @@ impl TestNetFactory for GrandpaTestNet { } } + fn uses_tokio(&self) -> bool { + true + } + fn peer(&self, i: usize) -> &GrandpaPeer { &self.peers[i] } diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index ca898eea078..c3184ab47dd 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -9,7 +9,6 @@ edition = "2018" [lib] [dependencies] -crossbeam-channel = "0.3.6" log = "0.4" parking_lot = "0.7.1" error-chain = "0.12" diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 685b447df6e..62c5f508dbd 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -30,7 +30,7 @@ use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessage use crate::on_demand::OnDemandService; use crate::specialization::NetworkSpecialization; use crate::sync::{ChainSync, Context as SyncContext, Status as SyncStatus, SyncState}; -use crate::service::{NetworkChan, NetworkMsg, TransactionPool, ExHashT}; +use crate::service::{TransactionPool, ExHashT}; use crate::config::{ProtocolConfig, Roles}; use parking_lot::RwLock; use rustc_hex::ToHex; @@ -76,7 +76,6 @@ const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12); // Lock must always be taken in order declared here. pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> { - network_chan: NetworkChan<B>, /// Interval at which we call `tick`. tick_timeout: tokio::timer::Interval, /// Interval at which we call `propagate_extrinsics`. @@ -148,6 +147,20 @@ pub struct PeerInfo<B: BlockT> { pub best_number: <B::Header as HeaderT>::Number, } +/// Context passed as input to the methods of `protocol.rs` and that is used to communicate back +/// with the network. +pub trait NetworkOut<B: BlockT> { + /// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or + /// irresponsible or appeared lazy. + fn report_peer(&mut self, who: PeerId, reputation: i32); + + /// Force disconnecting from a peer. + fn disconnect_peer(&mut self, who: PeerId); + + /// Send a message to a peer. + fn send_message(&mut self, who: PeerId, message: Message<B>); +} + /// Context for a network-specific handler. pub trait Context<B: BlockT> { /// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or @@ -166,29 +179,29 @@ pub trait Context<B: BlockT> { /// Protocol context. struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { - network_chan: &'a NetworkChan<B>, + network_out: &'a mut dyn NetworkOut<B>, context_data: &'a mut ContextData<B, H>, } impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { - fn new(context_data: &'a mut ContextData<B, H>, network_chan: &'a NetworkChan<B>) -> Self { - ProtocolContext { network_chan, context_data } + fn new(context_data: &'a mut ContextData<B, H>, network_out: &'a mut dyn NetworkOut<B>) -> Self { + ProtocolContext { network_out, context_data } } } impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, H> { fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.network_chan.send(NetworkMsg::ReportPeer(who, reputation)) + self.network_out.report_peer(who, reputation) } fn disconnect_peer(&mut self, who: PeerId) { - self.network_chan.send(NetworkMsg::DisconnectPeer(who)) + self.network_out.disconnect_peer(who) } fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) { send_message( &mut self.context_data.peers, - &self.network_chan, + self.network_out, who, GenericMessage::Consensus(consensus) ) @@ -197,7 +210,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>) { send_message( &mut self.context_data.peers, - &self.network_chan, + self.network_out, who, GenericMessage::ChainSpecific(message) ) @@ -206,11 +219,11 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext<B> for ProtocolContext<'a, B, H> { fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.network_chan.send(NetworkMsg::ReportPeer(who, reputation)) + self.network_out.report_peer(who, reputation) } fn disconnect_peer(&mut self, who: PeerId) { - self.network_chan.send(NetworkMsg::DisconnectPeer(who)) + self.network_out.disconnect_peer(who) } fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>> { @@ -224,7 +237,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext<B> for ProtocolContext<'a, fn send_finality_proof_request(&mut self, who: PeerId, request: FinalityProofRequestMessage<B::Hash>) { send_message( &mut self.context_data.peers, - &self.network_chan, + self.network_out, who, GenericMessage::FinalityProofRequest(request) ) @@ -233,7 +246,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext<B> for ProtocolContext<'a, fn send_block_request(&mut self, who: PeerId, request: BlockRequestMessage<B>) { send_message( &mut self.context_data.peers, - &self.network_chan, + self.network_out, who, GenericMessage::BlockRequest(request) ) @@ -252,7 +265,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// Create a new instance. pub fn new( connected_peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>, - network_chan: NetworkChan<B>, config: ProtocolConfig, chain: Arc<Client<B>>, finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>, @@ -263,7 +275,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { let info = chain.info()?; let sync = ChainSync::new(config.roles, &info); Ok(Protocol { - network_chan, tick_timeout: tokio::timer::Interval::new_interval(TICK_TIMEOUT), propagate_timeout: tokio::timer::Interval::new_interval(PROPAGATE_TIMEOUT), config: config, @@ -304,32 +315,26 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { pub fn is_offline(&self) -> bool { self.sync.status().is_offline() } -} - -impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Future for Protocol<B, S, H> { - type Item = (); - type Error = void::Void; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + pub fn poll(&mut self, network_out: &mut dyn NetworkOut<B>) -> Poll<void::Void, void::Void> { while let Ok(Async::Ready(_)) = self.tick_timeout.poll() { - self.tick(); + self.tick(network_out); } while let Ok(Async::Ready(_)) = self.propagate_timeout.poll() { - self.propagate_extrinsics(); + self.propagate_extrinsics(network_out); } Ok(Async::NotReady) } -} -impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { fn is_on_demand_response(&self, who: &PeerId, response_id: message::RequestId) -> bool { self.on_demand.as_ref().map_or(false, |od| od.is_on_demand_response(&who, response_id)) } fn handle_response( &mut self, + network_out: &mut dyn NetworkOut<B>, who: PeerId, response: &message::BlockResponse<B> ) -> Option<message::BlockRequest<B>> { @@ -344,8 +349,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { return request.map(|(_, r)| r) } trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id); - self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); - self.network_chan.send(NetworkMsg::DisconnectPeer(who)); + network_out.report_peer(who.clone(), i32::min_value()); + network_out.disconnect_peer(who); } None } @@ -364,48 +369,54 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } } - pub fn on_custom_message(&mut self, who: PeerId, message: Message<B>) -> CustomMessageOutcome<B> { + pub fn on_custom_message( + &mut self, + network_out: &mut dyn NetworkOut<B>, + who: PeerId, + message: Message<B> + ) -> CustomMessageOutcome<B> { match message { - GenericMessage::Status(s) => self.on_status_message(who, s), - GenericMessage::BlockRequest(r) => self.on_block_request(who, r), + GenericMessage::Status(s) => self.on_status_message(network_out, who, s), + GenericMessage::BlockRequest(r) => self.on_block_request(network_out, who, r), GenericMessage::BlockResponse(r) => { // Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter. if self.is_on_demand_response(&who, r.id) { self.on_remote_body_response(who, r); } else { - if let Some(request) = self.handle_response(who.clone(), &r) { - let outcome = self.on_block_response(who.clone(), request, r); + if let Some(request) = self.handle_response(network_out, who.clone(), &r) { + let outcome = self.on_block_response(network_out, who.clone(), request, r); self.update_peer_info(&who); return outcome } } }, GenericMessage::BlockAnnounce(announce) => { - self.on_block_announce(who.clone(), announce); + self.on_block_announce(network_out, who.clone(), announce); self.update_peer_info(&who); }, - GenericMessage::Transactions(m) => self.on_extrinsics(who, m), - GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request), + GenericMessage::Transactions(m) => self.on_extrinsics(network_out, who, m), + GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(network_out, who, request), GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(who, response), - GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(who, request), + GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(network_out, who, request), GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(who, response), - GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(who, request), + GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(network_out, who, request), GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(who, response), - GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(who, request), + GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(network_out, who, request), GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(who, response), - GenericMessage::FinalityProofRequest(request) => self.on_finality_proof_request(who, request), - GenericMessage::FinalityProofResponse(response) => return self.on_finality_proof_response(who, response), + GenericMessage::FinalityProofRequest(request) => self.on_finality_proof_request(network_out, who, request), + GenericMessage::FinalityProofResponse(response) => + return self.on_finality_proof_response(network_out, who, response), GenericMessage::Consensus(msg) => { if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 2) { self.consensus_gossip.on_incoming( - &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + &mut ProtocolContext::new(&mut self.context_data, network_out), who, msg, ); } } other => self.specialization.on_message( - &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + &mut ProtocolContext::new(&mut self.context_data, network_out), who, &mut Some(other), ), @@ -414,36 +425,43 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { CustomMessageOutcome::None } - fn send_message(&mut self, who: PeerId, message: Message<B>) { + fn send_message(&mut self, network_out: &mut dyn NetworkOut<B>, who: PeerId, message: Message<B>) { send_message::<B, H>( &mut self.context_data.peers, - &self.network_chan, + network_out, who, message, ); } /// Locks `self` and returns a context plus the `ConsensusGossip` struct. - pub fn consensus_gossip_lock<'a>(&'a mut self) -> (impl Context<B> + 'a, &'a mut ConsensusGossip<B>) { - let context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + pub fn consensus_gossip_lock<'a>( + &'a mut self, + network_out: &'a mut dyn NetworkOut<B> + ) -> (impl Context<B> + 'a, &'a mut ConsensusGossip<B>) { + let context = ProtocolContext::new(&mut self.context_data, network_out); (context, &mut self.consensus_gossip) } /// Locks `self` and returns a context plus the network specialization. - pub fn specialization_lock<'a>(&'a mut self) -> (impl Context<B> + 'a, &'a mut S) { - let context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + pub fn specialization_lock<'a>( + &'a mut self, + network_out: &'a mut dyn NetworkOut<B> + ) -> (impl Context<B> + 'a, &'a mut S) { + let context = ProtocolContext::new(&mut self.context_data, network_out); (context, &mut self.specialization) } /// Gossip a consensus message to the network. pub fn gossip_consensus_message( &mut self, + network_out: &mut dyn NetworkOut<B>, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>, recipient: GossipMessageRecipient, ) { - let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + let mut context = ProtocolContext::new(&mut self.context_data, network_out); let message = ConsensusMessage { data: message, engine_id }; match recipient { GossipMessageRecipient::BroadcastToAll => @@ -451,19 +469,19 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { GossipMessageRecipient::BroadcastNew => self.consensus_gossip.multicast(&mut context, topic, message, false), GossipMessageRecipient::Peer(who) => - self.send_message(who, GenericMessage::Consensus(message)), + self.send_message(network_out, who, GenericMessage::Consensus(message)), } } /// Called when a new peer is connected - pub fn on_peer_connected(&mut self, who: PeerId, debug_info: String) { + pub fn on_peer_connected(&mut self, network_out: &mut dyn NetworkOut<B>, who: PeerId, debug_info: String) { trace!(target: "sync", "Connecting {}: {}", who, debug_info); self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: time::Instant::now() }); - self.send_status(who); + self.send_status(network_out, who); } /// Called by peer when it is disconnecting - pub fn on_peer_disconnected(&mut self, peer: PeerId, debug_info: String) { + pub fn on_peer_disconnected(&mut self, network_out: &mut dyn NetworkOut<B>, peer: PeerId, debug_info: String) { trace!(target: "sync", "Disconnecting {}: {}", peer, debug_info); // lock all the the peer lists so that add/remove peer events are in order let removed = { @@ -472,7 +490,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { self.context_data.peers.remove(&peer) }; if let Some(peer_data) = removed { - let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + let mut context = ProtocolContext::new(&mut self.context_data, network_out); if peer_data.info.protocol_version > 2 { self.consensus_gossip.peer_disconnected(&mut context, peer.clone()); } @@ -484,8 +502,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// Called as a back-pressure mechanism if the networking detects that the peer cannot process /// our messaging rate fast enough. - pub fn on_clogged_peer(&self, who: PeerId, _msg: Option<Message<B>>) { - self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), CLOGGED_PEER_REPUTATION_CHANGE)); + pub fn on_clogged_peer(&self, network_out: &mut dyn NetworkOut<B>, who: PeerId, _msg: Option<Message<B>>) { + network_out.report_peer(who.clone(), CLOGGED_PEER_REPUTATION_CHANGE); // Print some diagnostics. if let Some(peer) = self.context_data.peers.get(&who) { @@ -498,13 +516,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } } - /// Puts the `Synchronized` message on `network_chan`. - #[cfg(any(test, feature = "test-helpers"))] - pub fn synchronize(&self) { - self.network_chan.send(NetworkMsg::Synchronized); - } - - fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest<B>) { + fn on_block_request(&mut self, network_out: &mut dyn NetworkOut<B>, peer: PeerId, request: message::BlockRequest<B>) { trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, @@ -515,8 +527,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { // sending block requests to the node that is unable to serve it is considered a bad behavior if !self.config.roles.is_full() { trace!(target: "sync", "Peer {} is trying to sync from the light node", peer); - self.network_chan.send(NetworkMsg::DisconnectPeer(peer.clone())); - self.network_chan.send(NetworkMsg::ReportPeer(peer, i32::min_value())); + network_out.disconnect_peer(peer.clone()); + network_out.report_peer(peer, i32::min_value()); return; } @@ -574,11 +586,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { blocks: blocks, }; trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len()); - self.send_message(peer, GenericMessage::BlockResponse(response)) + self.send_message(network_out, peer, GenericMessage::BlockResponse(response)) } fn on_block_response( &mut self, + network_out: &mut dyn NetworkOut<B>, peer: PeerId, request: message::BlockRequest<B>, response: message::BlockResponse<B>, @@ -602,7 +615,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { // justifications are imported asynchronously (#1482) if request.fields == message::BlockAttributes::JUSTIFICATION { let outcome = self.sync.on_block_justification_data( - &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + &mut ProtocolContext::new(&mut self.context_data, network_out), peer, request, response, @@ -616,7 +629,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } else { let outcome = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + &mut ProtocolContext::new(&mut self.context_data, network_out), peer, request, response @@ -632,16 +645,16 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// Perform time based maintenance. /// /// > **Note**: This method normally doesn't have to be called except for testing purposes. - pub fn tick(&mut self) { - self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); - self.maintain_peers(); - self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); + pub fn tick(&mut self, network_out: &mut dyn NetworkOut<B>) { + self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, network_out)); + self.maintain_peers(network_out); + self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, network_out)); self.on_demand .as_ref() .map(|s| s.maintain_peers()); } - fn maintain_peers(&mut self) { + fn maintain_peers(&mut self, network_out: &mut dyn NetworkOut<B>) { let tick = time::Instant::now(); let mut aborting = Vec::new(); { @@ -662,20 +675,20 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } } - self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); + self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, network_out)); for p in aborting { - let _ = self.network_chan.send(NetworkMsg::DisconnectPeer(p.clone())); - let _ = self.network_chan.send(NetworkMsg::ReportPeer(p, TIMEOUT_REPUTATION_CHANGE)); + network_out.disconnect_peer(p.clone()); + network_out.report_peer(p, TIMEOUT_REPUTATION_CHANGE); } } /// Called by peer to report status - fn on_status_message(&mut self, who: PeerId, status: message::Status<B>) { + fn on_status_message(&mut self, network_out: &mut dyn NetworkOut<B>, who: PeerId, status: message::Status<B>) { trace!(target: "sync", "New peer {} {:?}", who, status); let protocol_version = { if self.context_data.peers.contains_key(&who) { debug!("Unexpected status packet from {}", who); - self.network_chan.send(NetworkMsg::ReportPeer(who, UNEXPECTED_STATUS_REPUTATION_CHANGE)); + network_out.report_peer(who, UNEXPECTED_STATUS_REPUTATION_CHANGE); return; } if status.genesis_hash != self.genesis_hash { @@ -684,14 +697,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { "Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash ); - self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); - self.network_chan.send(NetworkMsg::DisconnectPeer(who)); + network_out.report_peer(who.clone(), i32::min_value()); + network_out.disconnect_peer(who); return; } if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version { trace!(target: "protocol", "Peer {:?} using unsupported protocol version {}", who, status.version); - self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); - self.network_chan.send(NetworkMsg::DisconnectPeer(who)); + network_out.report_peer(who.clone(), i32::min_value()); + network_out.disconnect_peer(who); return; } if self.config.roles.is_light() { @@ -708,10 +721,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { .unwrap_or(0); if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); - self.network_chan.send( - NetworkMsg::ReportPeer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE) - ); - self.network_chan.send(NetworkMsg::DisconnectPeer(who)); + network_out.report_peer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE); + network_out.disconnect_peer(who); return; } } @@ -751,7 +762,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { status.version }; - let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + let mut context = ProtocolContext::new(&mut self.context_data, network_out); self.on_demand .as_ref() .map(|s| s.on_connect(who.clone(), status.roles, status.best_number)); @@ -763,7 +774,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } /// Called when peer sends us new extrinsics - fn on_extrinsics(&mut self, who: PeerId, extrinsics: message::Transactions<B::Extrinsic>) { + fn on_extrinsics( + &mut self, + network_out: &mut dyn NetworkOut<B>, + who: PeerId, + extrinsics: message::Transactions<B::Extrinsic> + ) { // Accept extrinsics only when fully synced if self.sync.status().state != SyncState::Idle { trace!(target: "sync", "{} Ignoring extrinsics while syncing", who); @@ -773,7 +789,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { for t in extrinsics { if let Some(hash) = self.transaction_pool.import(&t) { - self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE)); + network_out.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE); peer.known_extrinsics.insert(hash); } else { trace!(target: "sync", "Extrinsic rejected"); @@ -783,7 +799,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } /// Call when we must propagate ready extrinsics to peers. - pub fn propagate_extrinsics(&mut self) { + pub fn propagate_extrinsics(&mut self, network_out: &mut dyn NetworkOut<B>) { debug!(target: "sync", "Propagating extrinsics"); // Accept transactions only when fully synced @@ -808,7 +824,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - self.network_chan.send(NetworkMsg::Outgoing(who.clone(), GenericMessage::Transactions(to_send))) + network_out.send_message(who.clone(), GenericMessage::Transactions(to_send)) } } self.transaction_pool.on_broadcasted(propagated_to); @@ -818,7 +834,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. - pub fn announce_block(&mut self, hash: B::Hash) { + pub fn announce_block(&mut self, network_out: &mut dyn NetworkOut<B>, hash: B::Hash) { let header = match self.context_data.chain.header(&BlockId::Hash(hash)) { Ok(Some(header)) => header, Ok(None) => { @@ -837,12 +853,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { for (who, ref mut peer) in self.context_data.peers.iter_mut() { trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who); peer.known_blocks.insert(hash); - self.network_chan.send(NetworkMsg::Outgoing(who.clone(), message.clone())) + network_out.send_message(who.clone(), message.clone()) } } /// Send Status message - fn send_status(&mut self, who: PeerId) { + fn send_status(&mut self, network_out: &mut dyn NetworkOut<B>, who: PeerId) { if let Ok(info) = self.context_data.chain.info() { let status = message::generic::Status { version: CURRENT_VERSION, @@ -853,11 +869,16 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { best_hash: info.chain.best_hash, chain_status: self.specialization.status(), }; - self.send_message(who, GenericMessage::Status(status)) + self.send_message(network_out, who, GenericMessage::Status(status)) } } - fn on_block_announce(&mut self, who: PeerId, announce: message::BlockAnnounce<B::Header>) { + fn on_block_announce( + &mut self, + network_out: &mut dyn NetworkOut<B>, + who: PeerId, + announce: message::BlockAnnounce<B::Header> + ) { let header = announce.header; let hash = header.hash(); { @@ -869,7 +890,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { .as_ref() .map(|s| s.on_block_announce(who.clone(), *header.number())); self.sync.on_block_announce( - &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + &mut ProtocolContext::new(&mut self.context_data, network_out), who.clone(), hash, &header, @@ -878,10 +899,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// Call this when a block has been imported in the import queue and we should announce it on /// the network. - pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { + pub fn on_block_imported(&mut self, network_out: &mut dyn NetworkOut<B>, hash: B::Hash, header: &B::Header) { self.sync.update_chain_info(header); self.specialization.on_block_imported( - &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + &mut ProtocolContext::new(&mut self.context_data, network_out), hash.clone(), header, ); @@ -898,23 +919,24 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { for (who, ref mut peer) in self.context_data.peers.iter_mut() { if peer.known_blocks.insert(hash.clone()) { trace!(target: "sync", "Announcing block {:?} to {}", hash, who); - self.network_chan.send(NetworkMsg::Outgoing(who.clone(), message.clone())) + network_out.send_message(who.clone(), message.clone()) } } } /// Call this when a block has been finalized. The sync layer may have some additional /// requesting to perform. - pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { + pub fn on_block_finalized(&mut self, network_out: &mut dyn NetworkOut<B>, hash: B::Hash, header: &B::Header) { self.sync.on_block_finalized( &hash, *header.number(), - &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + &mut ProtocolContext::new(&mut self.context_data, network_out), ); } fn on_remote_call_request( &mut self, + network_out: &mut dyn NetworkOut<B>, who: PeerId, request: message::RemoteCallRequest<B::Hash>, ) { @@ -938,12 +960,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { request.block, error ); - self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), RPC_FAILED_REPUTATION_CHANGE)); + network_out.report_peer(who.clone(), RPC_FAILED_REPUTATION_CHANGE); Default::default() } }; self.send_message( + network_out, who, GenericMessage::RemoteCallResponse(message::RemoteCallResponse { id: request.id, @@ -956,9 +979,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// /// Uses `protocol` to queue a new justification request and tries to dispatch all pending /// requests. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) { + pub fn request_justification(&mut self, network_out: &mut dyn NetworkOut<B>, hash: &B::Hash, number: NumberFor<B>) { let mut context = - ProtocolContext::new(&mut self.context_data, &self.network_chan); + ProtocolContext::new(&mut self.context_data, network_out); self.sync.request_justification(&hash, number, &mut context); } @@ -970,16 +993,21 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// A batch of blocks have been processed, with or without errors. /// Call this when a batch of blocks have been processed by the import queue, with or without /// errors. - pub fn blocks_processed(&mut self, processed_blocks: Vec<B::Hash>, has_error: bool) { + pub fn blocks_processed( + &mut self, + network_out: &mut dyn NetworkOut<B>, + processed_blocks: Vec<B::Hash>, + has_error: bool + ) { self.sync.blocks_processed(processed_blocks, has_error); let mut context = - ProtocolContext::new(&mut self.context_data, &self.network_chan); + ProtocolContext::new(&mut self.context_data, network_out); self.sync.maintain_sync(&mut context); } /// Restart the sync process. - pub fn restart(&mut self) { - let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + pub fn restart(&mut self, network_out: &mut dyn NetworkOut<B>) { + let mut context = ProtocolContext::new(&mut self.context_data, network_out); self.sync.restart(&mut context); } @@ -1001,8 +1029,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// Request a finality proof for the given block. /// /// Queues a new finality proof request and tries to dispatch all pending requests. - pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) { - let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + pub fn request_finality_proof(&mut self, network_out: &mut dyn NetworkOut<B>, hash: &B::Hash, number: NumberFor<B>) { + let mut context = ProtocolContext::new(&mut self.context_data, network_out); self.sync.request_finality_proof(&hash, number, &mut context); } @@ -1023,6 +1051,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { fn on_remote_read_request( &mut self, + network_out: &mut dyn NetworkOut<B>, who: PeerId, request: message::RemoteReadRequest<B::Hash>, ) { @@ -1042,6 +1071,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } }; self.send_message( + network_out, who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { id: request.id, @@ -1058,6 +1088,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { fn on_remote_header_request( &mut self, + network_out: &mut dyn NetworkOut<B>, who: PeerId, request: message::RemoteHeaderRequest<NumberFor<B>>, ) { @@ -1076,6 +1107,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } }; self.send_message( + network_out, who, GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse { id: request.id, @@ -1098,6 +1130,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { fn on_remote_changes_request( &mut self, + network_out: &mut dyn NetworkOut<B>, who: PeerId, request: message::RemoteChangesRequest<B::Hash>, ) { @@ -1135,6 +1168,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } }; self.send_message( + network_out, who, GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse { id: request.id, @@ -1163,6 +1197,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { fn on_finality_proof_request( &mut self, + network_out: &mut dyn NetworkOut<B>, who: PeerId, request: message::FinalityProofRequest<B::Hash>, ) { @@ -1184,6 +1219,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { }, }; self.send_message( + network_out, who, GenericMessage::FinalityProofResponse(message::FinalityProofResponse { id: 0, @@ -1195,12 +1231,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { fn on_finality_proof_response( &mut self, + network_out: &mut dyn NetworkOut<B>, who: PeerId, response: message::FinalityProofResponse<B::Hash>, ) -> CustomMessageOutcome<B> { trace!(target: "sync", "Finality proof response from {} for {}", who, response.block); let outcome = self.sync.on_block_finality_proof_data( - &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + &mut ProtocolContext::new(&mut self.context_data, network_out), who, response, ); @@ -1230,7 +1267,7 @@ pub enum CustomMessageOutcome<B: BlockT> { fn send_message<B: BlockT, H: ExHashT>( peers: &mut HashMap<PeerId, Peer<B, H>>, - network_chan: &NetworkChan<B>, + network_out: &mut dyn NetworkOut<B>, who: PeerId, mut message: Message<B>, ) { @@ -1245,5 +1282,5 @@ fn send_message<B: BlockT, H: ExHashT>( peer.block_request = Some((time::Instant::now(), r.clone())); } } - network_chan.send(NetworkMsg::Outgoing(who, message)); + network_out.send_message(who, message); } diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index e63ce4037f8..1a966fc933c 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -31,13 +31,12 @@ use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::message::Message; -use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer, ProtocolStatus, PeerInfo}; +use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer}; +use crate::protocol::{ProtocolStatus, PeerInfo, NetworkOut}; use crate::config::Params; use crate::error::Error; use crate::specialization::NetworkSpecialization; -use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError}; -use tokio::prelude::task::AtomicTask; use tokio::runtime::Builder as RuntimeBuilder; /// Interval at which we send status updates on the SyncProvider status stream. @@ -86,7 +85,7 @@ pub struct NetworkLink<B: BlockT, S: NetworkSpecialization<B>> { /// The protocol sender pub(crate) protocol_sender: mpsc::UnboundedSender<ProtocolMsg<B, S>>, /// The network sender - pub(crate) network_sender: NetworkChan<B>, + pub(crate) network_sender: mpsc::UnboundedSender<NetworkMsg<B>>, } impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> { @@ -102,8 +101,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> { let _ = self.protocol_sender.unbounded_send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); if !success { info!("Invalid justification provided by {} for #{}", who, hash); - let _ = self.network_sender.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); - let _ = self.network_sender.send(NetworkMsg::DisconnectPeer(who.clone())); + let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); + let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone())); } } @@ -135,13 +134,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> { )); if !success { info!("Invalid finality proof provided by {} for #{}", who, request_block.0); - let _ = self.network_sender.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); - let _ = self.network_sender.send(NetworkMsg::DisconnectPeer(who.clone())); + let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); + let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone())); } } fn report_peer(&self, who: PeerId, reputation_change: i32) { - self.network_sender.send(NetworkMsg::ReportPeer(who, reputation_change)); + let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who, reputation_change)); } fn restart(&self) { @@ -178,7 +177,7 @@ pub struct Service<B: BlockT + 'static, S: NetworkSpecialization<B>> { /// Peers whom we are connected with. peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>, /// Channel for networking messages processed by the background thread. - network_chan: NetworkChan<B>, + network_chan: mpsc::UnboundedSender<NetworkMsg<B>>, /// Network service network: Arc<Mutex<NetworkService<Message<B>>>>, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which @@ -199,7 +198,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> { protocol_id: ProtocolId, import_queue: Box<ImportQueue<B>>, ) -> Result<Arc<Service<B, S>>, Error> { - let (network_chan, network_port) = network_channel(); + let (network_chan, network_port) = mpsc::unbounded(); let (protocol_sender, protocol_rx) = mpsc::unbounded(); let status_sinks = Arc::new(Mutex::new(Vec::new())); // Start in off-line mode, since we're not connected to any nodes yet. @@ -208,7 +207,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> { let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>> = Arc::new(Default::default()); let protocol = Protocol::new( peers.clone(), - network_chan.clone(), params.config, params.chain, params.finality_proof_provider, @@ -322,14 +320,14 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> { /// /// This method is extremely poor in terms of API and should be eventually removed. pub fn disconnect_peer(&self, who: PeerId) { - let _ = self.network_chan.send(NetworkMsg::DisconnectPeer(who)); + let _ = self.network_chan.unbounded_send(NetworkMsg::DisconnectPeer(who)); } /// Send a message to the given peer. Has no effect if we're not connected to this peer. /// /// This method is extremely poor in terms of API and should be eventually removed. pub fn send_request(&self, who: PeerId, message: Message<B>) { - let _ = self.network_chan.send(NetworkMsg::Outgoing(who, message)); + let _ = self.network_chan.unbounded_send(NetworkMsg::Outgoing(who, message)); } /// Execute a closure with the chain-specific network specialization. @@ -433,81 +431,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> ManageNetwork for Service } } - -/// Create a NetworkPort/Chan pair. -pub fn network_channel<B: BlockT + 'static>() -> (NetworkChan<B>, NetworkPort<B>) { - let (network_sender, network_receiver) = channel::unbounded(); - let task_notify = Arc::new(AtomicTask::new()); - let network_port = NetworkPort::new(network_receiver, task_notify.clone()); - let network_chan = NetworkChan::new(network_sender, task_notify); - (network_chan, network_port) -} - - -/// A sender of NetworkMsg that notifies a task when a message has been sent. -#[derive(Clone)] -pub struct NetworkChan<B: BlockT + 'static> { - sender: Sender<NetworkMsg<B>>, - task_notify: Arc<AtomicTask>, -} - -impl<B: BlockT + 'static> NetworkChan<B> { - /// Create a new network chan. - pub fn new(sender: Sender<NetworkMsg<B>>, task_notify: Arc<AtomicTask>) -> Self { - NetworkChan { - sender, - task_notify, - } - } - - /// Send a messaging, to be handled on a stream. Notify the task handling the stream. - pub fn send(&self, msg: NetworkMsg<B>) { - let _ = self.sender.send(msg); - self.task_notify.notify(); - } -} - -impl<B: BlockT + 'static> Drop for NetworkChan<B> { - /// Notifying the task when a sender is dropped(when all are dropped, the stream is finished). - fn drop(&mut self) { - self.task_notify.notify(); - } -} - - -/// A receiver of NetworkMsg that makes the protocol-id available with each message. -pub struct NetworkPort<B: BlockT + 'static> { - receiver: Receiver<NetworkMsg<B>>, - task_notify: Arc<AtomicTask>, -} - -impl<B: BlockT + 'static> NetworkPort<B> { - /// Create a new network port for a given protocol-id. - pub fn new(receiver: Receiver<NetworkMsg<B>>, task_notify: Arc<AtomicTask>) -> Self { - Self { - receiver, - task_notify, - } - } - - /// Receive a message, if any is currently-enqueued. - /// Register the current tokio task for notification when a new message is available. - pub fn take_one_message(&self) -> Result<Option<NetworkMsg<B>>, ()> { - self.task_notify.register(); - match self.receiver.try_recv() { - Ok(msg) => Ok(Some(msg)), - Err(TryRecvError::Empty) => Ok(None), - Err(TryRecvError::Disconnected) => Err(()), - } - } - - /// Get a reference to the underlying crossbeam receiver. - #[cfg(any(test, feature = "test-helpers"))] - pub fn receiver(&self) -> &Receiver<NetworkMsg<B>> { - &self.receiver - } -} - /// Messages to be handled by NetworkService. #[derive(Debug)] pub enum NetworkMsg<B: BlockT + 'static> { @@ -592,7 +515,7 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( is_major_syncing: Arc<AtomicBool>, protocol: Protocol<B, S, H>, import_queue: Box<ImportQueue<B>>, - network_port: NetworkPort<B>, + network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>, protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>, status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>, config: NetworkConfiguration, @@ -645,11 +568,25 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( mut protocol: Protocol<B, S, H>, network_service: Arc<Mutex<NetworkService<Message<B>>>>, import_queue: Box<ImportQueue<B>>, - network_port: NetworkPort<B>, + mut network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>, mut protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>, status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>, peerset: PeersetHandle, ) -> impl Future<Item = (), Error = io::Error> { + // Implementation of `protocol::NetworkOut` using the available local variables. + struct Ctxt<'a, B: BlockT>(&'a mut NetworkService<Message<B>>, &'a PeersetHandle); + impl<'a, B: BlockT> NetworkOut<B> for Ctxt<'a, B> { + fn report_peer(&mut self, who: PeerId, reputation: i32) { + self.1.report_peer(who, reputation) + } + fn disconnect_peer(&mut self, who: PeerId) { + self.0.drop_node(&who) + } + fn send_message(&mut self, who: PeerId, message: Message<B>) { + self.0.send_custom_message(&who, message) + } + } + // Interval at which we send status updates on the `status_sinks`. let mut status_interval = tokio::timer::Interval::new_interval(STATUS_INTERVAL); @@ -659,25 +596,26 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); } - match protocol.poll() { - Ok(Async::Ready(())) => return Ok(Async::Ready(())), + match protocol.poll(&mut Ctxt(&mut network_service.lock(), &peerset)) { + Ok(Async::Ready(v)) => void::unreachable(v), Ok(Async::NotReady) => {} Err(err) => void::unreachable(err), } loop { - match network_port.take_one_message() { - Ok(None) => break, - Ok(Some(NetworkMsg::Outgoing(who, outgoing_message))) => + match network_port.poll() { + Ok(Async::NotReady) => break, + Ok(Async::Ready(Some(NetworkMsg::Outgoing(who, outgoing_message)))) => network_service.lock().send_custom_message(&who, outgoing_message), - Ok(Some(NetworkMsg::ReportPeer(who, reputation))) => + Ok(Async::Ready(Some(NetworkMsg::ReportPeer(who, reputation)))) => peerset.report_peer(who, reputation), - Ok(Some(NetworkMsg::DisconnectPeer(who))) => + Ok(Async::Ready(Some(NetworkMsg::DisconnectPeer(who)))) => network_service.lock().drop_node(&who), + #[cfg(any(test, feature = "test-helpers"))] - Ok(Some(NetworkMsg::Synchronized)) => {} + Ok(Async::Ready(Some(NetworkMsg::Synchronized))) => {} - Err(_) => return Ok(Async::Ready(())), + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), } } @@ -688,71 +626,78 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( Ok(Async::NotReady) => break, }; + let mut network_service = network_service.lock(); + let mut network_out = Ctxt(&mut network_service, &peerset); + match msg { ProtocolMsg::BlockImported(hash, header) => - protocol.on_block_imported(hash, &header), + protocol.on_block_imported(&mut network_out, hash, &header), ProtocolMsg::BlockFinalized(hash, header) => - protocol.on_block_finalized(hash, &header), + protocol.on_block_finalized(&mut network_out, hash, &header), ProtocolMsg::ExecuteWithSpec(task) => { - let (mut context, spec) = protocol.specialization_lock(); + let (mut context, spec) = protocol.specialization_lock(&mut network_out); task.call_box(spec, &mut context); }, ProtocolMsg::ExecuteWithGossip(task) => { - let (mut context, gossip) = protocol.consensus_gossip_lock(); + let (mut context, gossip) = protocol.consensus_gossip_lock(&mut network_out); task.call_box(gossip, &mut context); } ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => - protocol.gossip_consensus_message(topic, engine_id, message, recipient), + protocol.gossip_consensus_message(&mut network_out, topic, engine_id, message, recipient), ProtocolMsg::BlocksProcessed(hashes, has_error) => - protocol.blocks_processed(hashes, has_error), + protocol.blocks_processed(&mut network_out, hashes, has_error), ProtocolMsg::RestartSync => - protocol.restart(), + protocol.restart(&mut network_out), ProtocolMsg::AnnounceBlock(hash) => - protocol.announce_block(hash), + protocol.announce_block(&mut network_out, hash), ProtocolMsg::BlockImportedSync(hash, number) => protocol.block_imported(&hash, number), ProtocolMsg::ClearJustificationRequests => protocol.clear_justification_requests(), ProtocolMsg::RequestJustification(hash, number) => - protocol.request_justification(&hash, number), + protocol.request_justification(&mut network_out, &hash, number), ProtocolMsg::JustificationImportResult(hash, number, success) => protocol.justification_import_result(hash, number, success), ProtocolMsg::SetFinalityProofRequestBuilder(builder) => protocol.set_finality_proof_request_builder(builder), ProtocolMsg::RequestFinalityProof(hash, number) => - protocol.request_finality_proof(&hash, number), + protocol.request_finality_proof(&mut network_out, &hash, number), ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => protocol.finality_proof_import_result(requested_block, finalziation_result), - ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(), + ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(&mut network_out), #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Tick => protocol.tick(), + ProtocolMsg::Tick => protocol.tick(&mut network_out), #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Synchronize => protocol.synchronize(), + ProtocolMsg::Synchronize => {}, } } loop { - let outcome = match network_service.lock().poll() { + let mut network_service = network_service.lock(); + let poll_value = network_service.poll(); + let mut network_out = Ctxt(&mut network_service, &peerset); + + let outcome = match poll_value { Ok(Async::NotReady) => break, Ok(Async::Ready(Some(NetworkServiceEvent::OpenedCustomProtocol { peer_id, version, debug_info, .. }))) => { debug_assert!( version <= protocol::CURRENT_VERSION as u8 && version >= protocol::MIN_VERSION as u8 ); - protocol.on_peer_connected(peer_id, debug_info); + protocol.on_peer_connected(&mut network_out, peer_id, debug_info); CustomMessageOutcome::None } Ok(Async::Ready(Some(NetworkServiceEvent::ClosedCustomProtocol { peer_id, debug_info, .. }))) => { - protocol.on_peer_disconnected(peer_id, debug_info); + protocol.on_peer_disconnected(&mut network_out, peer_id, debug_info); CustomMessageOutcome::None }, Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) => - protocol.on_custom_message(peer_id, message), + protocol.on_custom_message(&mut network_out, peer_id, message), Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { debug!(target: "sync", "{:?}", msg); - protocol.on_clogged_peer(peer_id.clone(), Some(msg)); + protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg)); } CustomMessageOutcome::None } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 44e0eafd912..82d54d40b0c 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -26,7 +26,8 @@ use std::sync::Arc; use log::trace; use crate::chain::FinalityProofProvider; -use client::{self, ClientInfo, BlockchainEvents, FinalityNotifications, in_mem::Backend as InMemoryBackend, error::Result as ClientResult}; +use client::{self, ClientInfo, BlockchainEvents, FinalityNotifications}; +use client::{in_mem::Backend as InMemoryBackend, error::Result as ClientResult}; use client::block_builder::BlockBuilder; use client::backend::AuxStore; use crate::config::{ProtocolConfig, Roles}; @@ -38,17 +39,16 @@ use consensus::import_queue::{ use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind}; use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient, TopicNotification}; -use crossbeam_channel::RecvError; use futures::{prelude::*, sync::{mpsc, oneshot}}; use crate::message::Message; use network_libp2p::PeerId; use parking_lot::{Mutex, RwLock}; use primitives::{H256, sr25519::Public as AuthorityId, Blake2Hasher}; -use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolStatus, CustomMessageOutcome}; +use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolStatus, CustomMessageOutcome, NetworkOut}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor}; use runtime_primitives::{Justification, ConsensusEngineId}; -use crate::service::{network_channel, NetworkChan, NetworkLink, NetworkMsg, NetworkPort, ProtocolMsg, TransactionPool}; +use crate::service::{NetworkLink, NetworkMsg, ProtocolMsg, TransactionPool}; use crate::specialization::NetworkSpecialization; use test_client::{self, AccountKeyring}; @@ -115,8 +115,10 @@ impl NetworkSpecialization<Block> for DummySpecialization { } } -pub type PeersFullClient = client::Client<test_client::Backend, test_client::Executor, Block, test_client::runtime::RuntimeApi>; -pub type PeersLightClient = client::Client<test_client::LightBackend, test_client::LightExecutor, Block, test_client::runtime::RuntimeApi>; +pub type PeersFullClient = + client::Client<test_client::Backend, test_client::Executor, Block, test_client::runtime::RuntimeApi>; +pub type PeersLightClient = + client::Client<test_client::LightBackend, test_client::LightExecutor, Block, test_client::runtime::RuntimeApi>; #[derive(Clone)] pub enum PeersClient { @@ -181,7 +183,12 @@ impl PeersClient { } } - pub fn finalize_block(&self, id: BlockId<Block>, justification: Option<Justification>, notify: bool) -> ClientResult<()> { + pub fn finalize_block( + &self, + id: BlockId<Block>, + justification: Option<Justification>, + notify: bool + ) -> ClientResult<()> { match *self { PeersClient::Full(ref client) => client.finalize_block(id, justification, notify), PeersClient::Light(ref client) => client.finalize_block(id, justification, notify), @@ -201,7 +208,7 @@ impl<S: NetworkSpecialization<Block>> TestLink<S> { fn new( protocol_sender: mpsc::UnboundedSender<ProtocolMsg<Block, S>>, _network_to_protocol_sender: mpsc::UnboundedSender<FromNetworkMsg<Block>>, - network_sender: NetworkChan<Block> + network_sender: mpsc::UnboundedSender<NetworkMsg<Block>> ) -> TestLink<S> { TestLink { #[cfg(any(test, feature = "test-helpers"))] @@ -267,12 +274,12 @@ impl<S: NetworkSpecialization<Block>> Link<Block> for TestLink<S> { } pub struct Peer<D, S: NetworkSpecialization<Block>> { - pub peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>>, - pub peer_id: PeerId, + peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>>, + peer_id: PeerId, client: PeersClient, net_proto_channel: ProtocolChannel<S>, protocol_status: Arc<RwLock<ProtocolStatus<Block>>>, - pub import_queue: Box<BasicQueue<Block>>, + import_queue: Box<BasicQueue<Block>>, pub data: D, best_hash: Mutex<Option<H256>>, finalized_hash: Mutex<Option<H256>>, @@ -292,24 +299,28 @@ pub enum FromNetworkMsg<B: BlockT> { } struct ProtocolChannel<S: NetworkSpecialization<Block>> { + /// If true, we expect a tokio executor to be available. If false, we spawn our own. + use_tokio: bool, buffered_messages: Mutex<VecDeque<NetworkMsg<Block>>>, network_to_protocol_sender: mpsc::UnboundedSender<FromNetworkMsg<Block>>, client_to_protocol_sender: mpsc::UnboundedSender<ProtocolMsg<Block, S>>, - protocol_to_network_receiver: NetworkPort<Block>, + protocol_to_network_receiver: Mutex<mpsc::UnboundedReceiver<NetworkMsg<Block>>>, } impl<S: NetworkSpecialization<Block>> ProtocolChannel<S> { /// Create new buffered network port. pub fn new( + use_tokio: bool, network_to_protocol_sender: mpsc::UnboundedSender<FromNetworkMsg<Block>>, client_to_protocol_sender: mpsc::UnboundedSender<ProtocolMsg<Block, S>>, - protocol_to_network_receiver: NetworkPort<Block>, + protocol_to_network_receiver: mpsc::UnboundedReceiver<NetworkMsg<Block>>, ) -> Self { ProtocolChannel { + use_tokio, buffered_messages: Mutex::new(VecDeque::new()), network_to_protocol_sender, client_to_protocol_sender, - protocol_to_network_receiver, + protocol_to_network_receiver: Mutex::new(protocol_to_network_receiver), } } @@ -330,13 +341,23 @@ impl<S: NetworkSpecialization<Block>> ProtocolChannel<S> { } /// Wait until synchronization response is generated by the protocol. - pub fn wait_sync(&self) -> Result<(), RecvError> { - loop { - match self.protocol_to_network_receiver.receiver().recv() { - Ok(NetworkMsg::Synchronized) => return Ok(()), - Err(error) => return Err(error), - Ok(msg) => self.buffered_messages.lock().push_back(msg), + pub fn wait_sync(&self) -> Result<(), ()> { + let fut = futures::future::poll_fn(|| { + loop { + let mut protocol_to_network_receiver = self.protocol_to_network_receiver.lock(); + match protocol_to_network_receiver.poll() { + Ok(Async::Ready(Some(NetworkMsg::Synchronized))) => return Ok(Async::Ready(())), + Ok(Async::Ready(None)) | Err(_) => return Err(()), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(Some(msg))) => self.buffered_messages.lock().push_back(msg), + } } + }); + + if self.use_tokio { + fut.wait() + } else { + tokio::runtime::current_thread::block_on_all(fut) } } @@ -359,8 +380,13 @@ impl<S: NetworkSpecialization<Block>> ProtocolChannel<S> { /// Whether this peer is done syncing (has no messages to send). fn is_done(&self) -> bool { - self.buffered_messages.lock().is_empty() - && self.protocol_to_network_receiver.receiver().is_empty() + let mut buffered_messages = self.buffered_messages.lock(); + if let Some(msg) = self.channel_message() { + buffered_messages.push_back(msg); + false + } else { + buffered_messages.is_empty() + } } /// Return oldest buffered message if it exists. @@ -377,7 +403,20 @@ impl<S: NetworkSpecialization<Block>> ProtocolChannel<S> { /// Receive message from the channel. fn channel_message(&self) -> Option<NetworkMsg<Block>> { - self.protocol_to_network_receiver.receiver().try_recv().ok() + let fut = futures::future::poll_fn(|| -> Result<_, ()> { + Ok(Async::Ready(match self.protocol_to_network_receiver.lock().poll() { + Ok(Async::Ready(Some(m))) => Some(m), + Ok(Async::NotReady) => None, + Err(_) => None, + Ok(Async::Ready(None)) => None, + })) + }); + + if self.use_tokio { + fut.wait() + } else { + tokio::runtime::current_thread::block_on_all(fut) + }.ok().and_then(|a| a) } } @@ -387,13 +426,15 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>>, client: PeersClient, import_queue: Box<BasicQueue<Block>>, + use_tokio: bool, network_to_protocol_sender: mpsc::UnboundedSender<FromNetworkMsg<Block>>, protocol_sender: mpsc::UnboundedSender<ProtocolMsg<Block, S>>, - network_sender: NetworkChan<Block>, - network_port: NetworkPort<Block>, + network_sender: mpsc::UnboundedSender<NetworkMsg<Block>>, + network_port: mpsc::UnboundedReceiver<NetworkMsg<Block>>, data: D, ) -> Self { let net_proto_channel = ProtocolChannel::new( + use_tokio, network_to_protocol_sender.clone(), protocol_sender.clone(), network_port, @@ -417,7 +458,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { } } /// Called after blockchain has been populated to updated current state. - pub fn start(&self) { + fn start(&self) { // Update the sync state to the latest chain state. let info = self.client.info().expect("In-mem client does not fail"); let header = self @@ -428,7 +469,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { self.net_proto_channel.send_from_client(ProtocolMsg::BlockImported(info.chain.best_hash, header)); } - pub fn on_block_imported( + fn on_block_imported( &self, hash: <Block as BlockT>::Hash, header: &<Block as BlockT>::Header, @@ -438,18 +479,18 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { /// SyncOracle: are we connected to any peer? #[cfg(test)] - pub fn is_offline(&self) -> bool { + fn is_offline(&self) -> bool { self.protocol_status.read().sync.is_offline() } /// SyncOracle: are we in the process of catching-up with the chain? #[cfg(test)] - pub fn is_major_syncing(&self) -> bool { + fn is_major_syncing(&self) -> bool { self.protocol_status.read().sync.is_major_syncing() } /// Get protocol status. - pub fn protocol_status(&self) -> ProtocolStatus<Block> { + fn protocol_status(&self) -> ProtocolStatus<Block> { self.protocol_status.read().clone() } @@ -507,7 +548,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { } /// Send block finalization notifications. - pub fn send_finality_notifications(&self) { + fn send_finality_notifications(&self) { let info = self.client.info().expect("In-mem client does not fail"); let mut finalized_hash = self.finalized_hash.lock(); @@ -543,10 +584,6 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { ); } - pub fn consensus_gossip_collect_garbage_for_topic(&self, _topic: <Block as BlockT>::Hash) { - self.with_gossip(move |gossip, _| gossip.collect_garbage()) - } - /// access the underlying consensus gossip handler pub fn consensus_gossip_messages_for( &self, @@ -569,7 +606,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { } /// Announce a block to peers. - pub fn announce_block(&self, block: Hash) { + fn announce_block(&self, block: Hash) { self.net_proto_channel.send_from_client(ProtocolMsg::AnnounceBlock(block)); } @@ -589,9 +626,13 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { /// Add blocks to the peer -- edit the block before adding. The chain will /// start at the given block iD. - pub fn generate_blocks_at<F>(&self, at: BlockId<Block>, count: usize, origin: BlockOrigin, mut edit_block: F) -> H256 - where F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block - { + fn generate_blocks_at<F>( + &self, + at: BlockId<Block>, + count: usize, + origin: BlockOrigin, + mut edit_block: F + ) -> H256 where F: FnMut(BlockBuilder<Block, PeersFullClient>) -> Block { let full_client = self.client.as_full().expect("blocks could only be generated by full clients"); let mut at = full_client.header(&at).unwrap().unwrap().hash(); for _ in 0..count { @@ -633,7 +674,7 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { /// Push blocks to the peer (simplified: with or without a TX) starting from /// given hash. - pub fn push_blocks_at(&self, at: BlockId<Block>, count: usize, with_tx: bool) -> H256 { + fn push_blocks_at(&self, at: BlockId<Block>, count: usize, with_tx: bool) -> H256 { let mut nonce = 0; if with_tx { self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| { @@ -728,6 +769,11 @@ pub trait TestNetFactory: Sized { ProtocolConfig::default() } + /// Must return true if the testnet is going to be used from within a tokio context. + fn uses_tokio(&self) -> bool { + false + } + /// Create new test network with this many peers. fn new(n: usize) -> Self { trace!(target: "test_network", "Creating test network"); @@ -747,26 +793,41 @@ pub trait TestNetFactory: Sized { protocol_status: Arc<RwLock<ProtocolStatus<Block>>>, import_queue: Box<BasicQueue<Block>>, mut protocol: Protocol<Block, Self::Specialization, Hash>, + network_sender: mpsc::UnboundedSender<NetworkMsg<Block>>, mut network_to_protocol_rx: mpsc::UnboundedReceiver<FromNetworkMsg<Block>>, mut protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<Block, Self::Specialization>>, peer: Arc<Peer<Self::PeerData, Self::Specialization>>, ) { std::thread::spawn(move || { + // Implementation of `protocol::NetworkOut` using the available local variables. + struct Ctxt<'a, B: BlockT>(&'a mpsc::UnboundedSender<NetworkMsg<B>>); + impl<'a, B: BlockT> NetworkOut<B> for Ctxt<'a, B> { + fn report_peer(&mut self, who: PeerId, reputation: i32) { + let _ = self.0.unbounded_send(NetworkMsg::ReportPeer(who, reputation)); + } + fn disconnect_peer(&mut self, who: PeerId) { + let _ = self.0.unbounded_send(NetworkMsg::DisconnectPeer(who)); + } + fn send_message(&mut self, who: PeerId, message: Message<B>) { + let _ = self.0.unbounded_send(NetworkMsg::Outgoing(who, message)); + } + } + tokio::runtime::current_thread::run(futures::future::poll_fn(move || { while let Async::Ready(msg) = network_to_protocol_rx.poll().unwrap() { let outcome = match msg { Some(FromNetworkMsg::PeerConnected(peer_id, debug_msg)) => { - protocol.on_peer_connected(peer_id, debug_msg); + protocol.on_peer_connected(&mut Ctxt(&network_sender), peer_id, debug_msg); CustomMessageOutcome::None }, Some(FromNetworkMsg::PeerDisconnected(peer_id, debug_msg)) => { - protocol.on_peer_disconnected(peer_id, debug_msg); + protocol.on_peer_disconnected(&mut Ctxt(&network_sender), peer_id, debug_msg); CustomMessageOutcome::None }, Some(FromNetworkMsg::CustomMessage(peer_id, message)) => - protocol.on_custom_message(peer_id, message), + protocol.on_custom_message(&mut Ctxt(&network_sender), peer_id, message), Some(FromNetworkMsg::Synchronize) => { - protocol.synchronize(); + let _ = network_sender.unbounded_send(NetworkMsg::Synchronized); CustomMessageOutcome::None }, None => return Ok(Async::Ready(())), @@ -792,51 +853,59 @@ pub trait TestNetFactory: Sized { match msg { ProtocolMsg::BlockImported(hash, header) => - protocol.on_block_imported(hash, &header), + protocol.on_block_imported(&mut Ctxt(&network_sender), hash, &header), ProtocolMsg::BlockFinalized(hash, header) => - protocol.on_block_finalized(hash, &header), + protocol.on_block_finalized(&mut Ctxt(&network_sender), hash, &header), ProtocolMsg::ExecuteWithSpec(task) => { - let (mut context, spec) = protocol.specialization_lock(); + let mut ctxt = Ctxt(&network_sender); + let (mut context, spec) = protocol.specialization_lock(&mut ctxt); task.call_box(spec, &mut context); }, ProtocolMsg::ExecuteWithGossip(task) => { - let (mut context, gossip) = protocol.consensus_gossip_lock(); + let mut ctxt = Ctxt(&network_sender); + let (mut context, gossip) = protocol.consensus_gossip_lock(&mut ctxt); task.call_box(gossip, &mut context); } ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => - protocol.gossip_consensus_message(topic, engine_id, message, recipient), + protocol.gossip_consensus_message( + &mut Ctxt(&network_sender), + topic, + engine_id, + message, + recipient + ), ProtocolMsg::BlocksProcessed(hashes, has_error) => - protocol.blocks_processed(hashes, has_error), + protocol.blocks_processed(&mut Ctxt(&network_sender), hashes, has_error), ProtocolMsg::RestartSync => - protocol.restart(), + protocol.restart(&mut Ctxt(&network_sender)), ProtocolMsg::AnnounceBlock(hash) => - protocol.announce_block(hash), + protocol.announce_block(&mut Ctxt(&network_sender), hash), ProtocolMsg::BlockImportedSync(hash, number) => protocol.block_imported(&hash, number), ProtocolMsg::ClearJustificationRequests => protocol.clear_justification_requests(), ProtocolMsg::RequestJustification(hash, number) => - protocol.request_justification(&hash, number), + protocol.request_justification(&mut Ctxt(&network_sender), &hash, number), ProtocolMsg::JustificationImportResult(hash, number, success) => protocol.justification_import_result(hash, number, success), ProtocolMsg::SetFinalityProofRequestBuilder(builder) => protocol.set_finality_proof_request_builder(builder), ProtocolMsg::RequestFinalityProof(hash, number) => - protocol.request_finality_proof(&hash, number), + protocol.request_finality_proof(&mut Ctxt(&network_sender), &hash, number), ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => protocol.finality_proof_import_result(requested_block, finalziation_result), - ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(), + ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(&mut Ctxt(&network_sender)), #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Tick => protocol.tick(), + ProtocolMsg::Tick => protocol.tick(&mut Ctxt(&network_sender)), #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Synchronize => { trace!(target: "sync", "handle_client_msg: received Synchronize msg"); - protocol.synchronize(); + let _ = network_sender.unbounded_send(NetworkMsg::Synchronized); } } } - if let Async::Ready(_) = protocol.poll().unwrap() { + if let Async::Ready(_) = protocol.poll(&mut Ctxt(&network_sender)).unwrap() { return Ok(Async::Ready(())) } @@ -865,7 +934,7 @@ pub trait TestNetFactory: Sized { let verifier = self.make_verifier(PeersClient::Full(client.clone()), config); let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data) = self.make_block_import(PeersClient::Full(client.clone())); - let (network_sender, network_port) = network_channel(); + let (network_sender, network_port) = mpsc::unbounded(); let import_queue = Box::new(BasicQueue::new( verifier, @@ -882,7 +951,6 @@ pub trait TestNetFactory: Sized { let protocol = Protocol::new( peers.clone(), - network_sender.clone(), config.clone(), client.clone(), self.make_finality_proof_provider(PeersClient::Full(client.clone())), @@ -896,6 +964,7 @@ pub trait TestNetFactory: Sized { protocol_status.clone(), import_queue.clone(), protocol, + network_sender.clone(), network_to_protocol_rx, protocol_rx, Arc::new(Peer::new( @@ -903,6 +972,7 @@ pub trait TestNetFactory: Sized { peers, PeersClient::Full(client), import_queue, + self.uses_tokio(), network_to_protocol_sender, protocol_sender, network_sender, @@ -922,7 +992,7 @@ pub trait TestNetFactory: Sized { let verifier = self.make_verifier(PeersClient::Light(client.clone()), &config); let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data) = self.make_block_import(PeersClient::Light(client.clone())); - let (network_sender, network_port) = network_channel(); + let (network_sender, network_port) = mpsc::unbounded(); let import_queue = Box::new(BasicQueue::new( verifier, @@ -939,7 +1009,6 @@ pub trait TestNetFactory: Sized { let protocol = Protocol::new( peers.clone(), - network_sender.clone(), config, client.clone(), self.make_finality_proof_provider(PeersClient::Light(client.clone())), @@ -953,6 +1022,7 @@ pub trait TestNetFactory: Sized { protocol_status.clone(), import_queue.clone(), protocol, + network_sender.clone(), network_to_protocol_rx, protocol_rx, Arc::new(Peer::new( @@ -960,6 +1030,7 @@ pub trait TestNetFactory: Sized { peers, PeersClient::Light(client), import_queue, + self.uses_tokio(), network_to_protocol_sender, protocol_sender, network_sender, @@ -1187,7 +1258,7 @@ impl TestNetFactory for JustificationTestNet { self.0.peers() } - fn mut_peers<F: FnOnce(&mut Vec<Arc<Peer<Self::PeerData, Self::Specialization>>>)>(&mut self, closure: F ) { + fn mut_peers<F: FnOnce(&mut Vec<Arc<Peer<Self::PeerData, Self::Specialization>>>)>(&mut self, closure: F) { self.0.mut_peers(closure) } -- GitLab