diff --git a/substrate/core/peerset/src/lib.rs b/substrate/core/peerset/src/lib.rs index ac076be4b51f33f3248ff0907fb4dea668ef1f8e..9b5155455a66ab70513d84b7e215051a8fa4ce40 100644 --- a/substrate/core/peerset/src/lib.rs +++ b/substrate/core/peerset/src/lib.rs @@ -25,6 +25,8 @@ use libp2p::PeerId; use log::{debug, error, trace}; use serde_json::json; +/// We don't accept nodes whose reputation is under this value. +const BANNED_THRESHOLD: i32 = 82 * (i32::min_value() / 100); /// Reputation change for a node when we get disconnected from it. const DISCONNECT_REPUTATION_CHANGE: i32 = -10; @@ -232,7 +234,13 @@ impl Peerset { self.update_time(); match self.data.peer(&peer_id) { - peersstate::Peer::Connected(mut peer) => peer.add_reputation(score_diff), + peersstate::Peer::Connected(mut peer) => { + peer.add_reputation(score_diff); + if peer.reputation() < BANNED_THRESHOLD { + peer.disconnect(); + self.message_queue.push_back(Message::Drop(peer_id)); + } + }, peersstate::Peer::NotConnected(mut peer) => peer.add_reputation(score_diff), peersstate::Peer::Unknown(peer) => peer.discover().add_reputation(score_diff), } @@ -283,22 +291,27 @@ impl Peerset { fn alloc_slots(&mut self) { self.update_time(); + // Try to grab the next node to attempt to connect to. + while let Some(next) = self.data.reserved_not_connected_peer() { + match next.try_outgoing() { + Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())), + Err(_) => break, // No more slots available. + } + } + loop { + if self.reserved_only { + break + } + // Try to grab the next node to attempt to connect to. - let next = match self.data.reserved_not_connected_peer() { + let next = match self.data.highest_not_connected_peer() { Some(p) => p, - None => if self.reserved_only { - break // No known node to add. - } else { - match self.data.highest_not_connected_peer() { - Some(p) => p, - None => break, // No known node to add. - } - } + None => break, // No known node to add. }; // Don't connect to nodes with an abysmal reputation. - if next.reputation() == i32::min_value() { + if next.reputation() < BANNED_THRESHOLD { break; } @@ -321,6 +334,7 @@ impl Peerset { // `PeerId` before that message has been read by the user. In this situation we must not answer. pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { trace!(target: "peerset", "Incoming {:?}", peer_id); + self.update_time(); let not_connected = match self.data.peer(&peer_id) { // If we're already connected, don't answer, as the docs mention. @@ -329,6 +343,11 @@ impl Peerset { peersstate::Peer::Unknown(entry) => entry.discover(), }; + if not_connected.reputation() < BANNED_THRESHOLD { + self.message_queue.push_back(Message::Reject(index)); + return + } + match not_connected.try_accept_incoming() { Ok(_) => self.message_queue.push_back(Message::Accept(index)), Err(_) => self.message_queue.push_back(Message::Reject(index)), @@ -430,7 +449,8 @@ impl Stream for Peerset { mod tests { use libp2p::PeerId; use futures::prelude::*; - use super::{PeersetConfig, Peerset, Message, IncomingIndex}; + use super::{PeersetConfig, Peerset, Message, IncomingIndex, BANNED_THRESHOLD}; + use std::{thread, time::Duration}; fn assert_messages(mut peerset: Peerset, messages: Vec<Message>) -> Peerset { for expected_message in messages { @@ -528,4 +548,45 @@ mod tests { Message::Connect(discovered), ]); } + + #[test] + fn test_peerset_banned() { + let (mut peerset, handle) = Peerset::from_config(PeersetConfig { + in_peers: 25, + out_peers: 25, + bootnodes: vec![], + reserved_only: false, + reserved_nodes: vec![], + }); + + // We ban a node by setting its reputation under the threshold. + let peer_id = PeerId::random(); + handle.report_peer(peer_id.clone(), BANNED_THRESHOLD - 1); + + let fut = futures::future::poll_fn(move || -> Result<_, ()> { + // We need one polling for the message to be processed. + assert_eq!(peerset.poll().unwrap(), Async::NotReady); + + // Check that an incoming connection from that node gets refused. + peerset.incoming(peer_id.clone(), IncomingIndex(1)); + if let Async::Ready(msg) = peerset.poll().unwrap() { + assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1))); + } else { + panic!() + } + + // Wait a bit for the node's reputation to go above the threshold. + thread::sleep(Duration::from_millis(1500)); + + // Try again. This time the node should be accepted. + peerset.incoming(peer_id.clone(), IncomingIndex(2)); + while let Async::Ready(msg) = peerset.poll().unwrap() { + assert_eq!(msg.unwrap(), Message::Accept(IncomingIndex(2))); + } + + Ok(Async::Ready(())) + }); + + tokio::runtime::current_thread::Runtime::new().unwrap().block_on(fut).unwrap(); + } }