diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index c7e5dd9d2068c231ea06390bcf960c887d962c01..57109195361219d5fbbaa865a02724d5441fa67a 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -82,14 +82,7 @@ pub struct ProtocolStatus<B: BlockT> { /// Peer information #[derive(Debug)] struct Peer<B: BlockT, H: ExHashT> { - /// Protocol version - protocol_version: u32, - /// Roles - roles: Roles, - /// Peer best block hash - best_hash: B::Hash, - /// Peer best block number - best_number: <B::Header as HeaderT>::Number, + info: PeerInfo<B>, /// Current block request, if any. block_request: Option<(time::Instant, message::BlockRequest<B>)>, /// Requests we are no longer insterested in. @@ -131,61 +124,28 @@ pub trait Context<B: BlockT> { } /// Protocol context. -pub(crate) struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { +struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { network_chan: &'a NetworkChan<B>, context_data: &'a mut ContextData<B, H>, } impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { - pub(crate) fn new( - context_data: &'a mut ContextData<B, H>, - network_chan: &'a NetworkChan<B>, - ) -> Self { - ProtocolContext { - network_chan, - context_data, - } - } - - /// Send a message to a peer. - pub fn send_message(&mut self, who: NodeIndex, message: Message<B>) { - send_message( - &mut self.context_data.peers, - &self.network_chan, - who, - message, - ) - } - - /// Point out that a peer has been malign or irresponsible or appeared lazy. - pub fn report_peer(&mut self, who: NodeIndex, reason: Severity) { - let _ = self - .network_chan - .send(NetworkMsg::ReportPeer(who, reason)); - } - - /// Get peer info. - pub fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>> { - self.context_data.peers.get(&peer).map(|p| PeerInfo { - roles: p.roles, - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - }) + fn new(context_data: &'a mut ContextData<B, H>, network_chan: &'a NetworkChan<B>) -> Self { + ProtocolContext { network_chan, context_data } } } impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, H> { fn send_message(&mut self, who: NodeIndex, message: Message<B>) { - ProtocolContext::send_message(self, who, message); + send_message(&mut self.context_data.peers, &self.network_chan, who, message) } fn report_peer(&mut self, who: NodeIndex, reason: Severity) { - ProtocolContext::report_peer(self, who, reason); + self.network_chan.send(NetworkMsg::ReportPeer(who, reason)) } fn peer_info(&self, who: NodeIndex) -> Option<PeerInfo<B>> { - ProtocolContext::peer_info(self, who) + self.context_data.peers.get(&who).map(|p| p.info.clone()) } fn client(&self) -> &Client<B> { @@ -194,7 +154,7 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, } /// Data necessary to create a context. -pub(crate) struct ContextData<B: BlockT, H: ExHashT> { +struct ContextData<B: BlockT, H: ExHashT> { // All connected peers peers: HashMap<NodeIndex, Peer<B, H>>, pub chain: Arc<Client<B>>, @@ -345,17 +305,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { fn handle_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool { match msg { ProtocolMsg::Peers(sender) => { - let peers = self.context_data.peers.iter().map(|(idx, p)| { - ( - *idx, - PeerInfo { - roles: p.roles, - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } - ) - }).collect(); + let peers = self.context_data.peers.iter().map(|(idx, p)| (*idx, p.info.clone())).collect(); let _ = sender.send(peers); }, ProtocolMsg::PeerDisconnected(who, debug_info) => self.on_peer_disconnected(who, debug_info), @@ -430,9 +380,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { return request.map(|(_, r)| r) } trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id,); - let _ = self - .network_chan - .send(NetworkMsg::ReportPeer(who, Severity::Bad("Unexpected response packet received from peer".to_string()))); + let severity = Severity::Bad("Unexpected response packet received from peer".to_string()); + self.network_chan.send(NetworkMsg::ReportPeer(who, severity)) } None } @@ -532,17 +481,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// Called as a back-pressure mechanism if the networking detects that the peer cannot process /// our messaging rate fast enough. - pub fn on_clogged_peer( - &self, - who: NodeIndex, - _message: Option<Message<B>>, - ) { + pub fn on_clogged_peer(&self, who: NodeIndex, _msg: Option<Message<B>>) { // We don't do anything but print some diagnostics for now. if let Some(peer) = self.context_data.peers.get(&who) { debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \ known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})", - who, peer.protocol_version, peer.roles, peer.known_extrinsics, peer.known_blocks, - peer.best_hash, peer.best_number); + who, peer.info.protocol_version, peer.info.roles, peer.known_extrinsics, peer.known_blocks, + peer.info.best_hash, peer.info.best_number); } else { debug!(target: "sync", "Peer clogged before being properly connected"); } @@ -573,7 +518,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { let number = header.number().clone(); let hash = header.hash(); let parent_hash = header.parent_hash().clone(); - let justification = if get_justification { self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None) } else { None }; + let justification = if get_justification { + self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None) + } else { + None + }; let block_data = message::generic::BlockData { hash: hash, header: if get_header { Some(header) } else { None }, @@ -677,11 +626,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } } - self.specialization - .maintain_peers(&mut ProtocolContext::new( - &mut self.context_data, - &self.network_chan, - )); + self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); for p in aborting { let _ = self .network_chan @@ -689,16 +634,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } } - #[allow(dead_code)] - fn peer_info(&mut self, peer: NodeIndex) -> Option<PeerInfo<B>> { - self.context_data.peers.get(&peer).map(|p| PeerInfo { - roles: p.roles, - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - }) - } - /// Called by peer to report status fn on_status_message(&mut self, who: NodeIndex, status: message::Status<B>) { trace!(target: "sync", "New peer {} {:?}", who, status); @@ -751,10 +686,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } let peer = Peer { - protocol_version: status.version, - roles: status.roles, - best_hash: status.best_hash, - best_number: status.best_number, + info: PeerInfo { + protocol_version: status.version, + roles: status.roles, + best_hash: status.best_hash, + best_number: status.best_number + }, block_request: None, known_extrinsics: HashSet::new(), known_blocks: HashSet::new(), @@ -805,8 +742,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } let extrinsics = self.transaction_pool.transactions(); - // FIXME: find a way to remove this vec. https://github.com/paritytech/substrate/issues/1698 - let mut will_send = vec![]; let mut propagated_to = HashMap::new(); for (who, ref mut peer) in self.context_data.peers.iter_mut() { let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics @@ -830,12 +765,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - will_send.push((who.clone(), to_send)); + self.network_chan.send(NetworkMsg::Outgoing(*who, GenericMessage::Transactions(to_send))) } } - for (who, to_send) in will_send { - self.send_message(who, GenericMessage::Transactions(to_send)); - } self.transaction_pool.on_broadcasted(propagated_to); } @@ -857,17 +789,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { }; let hash = header.hash(); - // FIXME: find a way to remove this vec. https://github.com/paritytech/substrate/issues/1698 - let mut to_send = vec![]; + let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() }); + for (who, ref mut peer) in self.context_data.peers.iter_mut() { trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who); peer.known_blocks.insert(hash); - to_send.push(who.clone()); - } - for who in to_send { - self.send_message(who, GenericMessage::BlockAnnounce(message::BlockAnnounce { - header: header.clone() - })); + self.network_chan.send(NetworkMsg::Outgoing(*who, message.clone())) } } @@ -922,7 +849,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { - self.sync.update_chain_info(&header); + self.sync.update_chain_info(header); self.specialization.on_block_imported( &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), hash.clone(), @@ -936,22 +863,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { // send out block announcements - // FIXME: find a way to remove this vec. https://github.com/paritytech/substrate/issues/1698 - let mut to_send = vec![]; + let message = GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() }); + for (who, ref mut peer) in self.context_data.peers.iter_mut() { if peer.known_blocks.insert(hash.clone()) { trace!(target: "sync", "Announcing block {:?} to {}", hash, who); - to_send.push(who.clone()); + self.network_chan.send(NetworkMsg::Outgoing(*who, message.clone())) } } - for who in to_send { - self.send_message( - who, - GenericMessage::BlockAnnounce(message::BlockAnnounce { - header: header.clone(), - }), - ); - } } fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { @@ -1111,19 +1030,16 @@ fn send_message<B: BlockT, H: ExHashT>( who: NodeIndex, mut message: Message<B>, ) { - match message { - GenericMessage::BlockRequest(ref mut r) => { - if let Some(ref mut peer) = peers.get_mut(&who) { - r.id = peer.next_request_id; - peer.next_request_id = peer.next_request_id + 1; - if let Some((timestamp, request)) = peer.block_request.take() { - trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who); - peer.obsolete_requests.insert(request.id, timestamp); - } - peer.block_request = Some((time::Instant::now(), r.clone())); + if let GenericMessage::BlockRequest(ref mut r) = message { + if let Some(ref mut peer) = peers.get_mut(&who) { + r.id = peer.next_request_id; + peer.next_request_id = peer.next_request_id + 1; + if let Some((timestamp, request)) = peer.block_request.take() { + trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who); + peer.obsolete_requests.insert(request.id, timestamp); } + peer.block_request = Some((time::Instant::now(), r.clone())); } - _ => (), } network_chan.send(NetworkMsg::Outgoing(who, message)); }