Skip to content
Snippets Groups Projects
Commit 24c08a78 authored by Pierre Krieger's avatar Pierre Krieger Committed by Gavin Wood
Browse files

Make peerset ban nodes under a reputation (#2667)

parent ff479c4e
No related merge requests found
......@@ -25,6 +25,8 @@ use libp2p::PeerId;
use log::{debug, error, trace};
use serde_json::json;
/// We don't accept nodes whose reputation is under this value.
const BANNED_THRESHOLD: i32 = 82 * (i32::min_value() / 100);
/// Reputation change for a node when we get disconnected from it.
const DISCONNECT_REPUTATION_CHANGE: i32 = -10;
......@@ -232,7 +234,13 @@ impl Peerset {
self.update_time();
match self.data.peer(&peer_id) {
peersstate::Peer::Connected(mut peer) => peer.add_reputation(score_diff),
peersstate::Peer::Connected(mut peer) => {
peer.add_reputation(score_diff);
if peer.reputation() < BANNED_THRESHOLD {
peer.disconnect();
self.message_queue.push_back(Message::Drop(peer_id));
}
},
peersstate::Peer::NotConnected(mut peer) => peer.add_reputation(score_diff),
peersstate::Peer::Unknown(peer) => peer.discover().add_reputation(score_diff),
}
......@@ -283,22 +291,27 @@ impl Peerset {
fn alloc_slots(&mut self) {
self.update_time();
// Try to grab the next node to attempt to connect to.
while let Some(next) = self.data.reserved_not_connected_peer() {
match next.try_outgoing() {
Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())),
Err(_) => break, // No more slots available.
}
}
loop {
if self.reserved_only {
break
}
// Try to grab the next node to attempt to connect to.
let next = match self.data.reserved_not_connected_peer() {
let next = match self.data.highest_not_connected_peer() {
Some(p) => p,
None => if self.reserved_only {
break // No known node to add.
} else {
match self.data.highest_not_connected_peer() {
Some(p) => p,
None => break, // No known node to add.
}
}
None => break, // No known node to add.
};
// Don't connect to nodes with an abysmal reputation.
if next.reputation() == i32::min_value() {
if next.reputation() < BANNED_THRESHOLD {
break;
}
......@@ -321,6 +334,7 @@ impl Peerset {
// `PeerId` before that message has been read by the user. In this situation we must not answer.
pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) {
trace!(target: "peerset", "Incoming {:?}", peer_id);
self.update_time();
let not_connected = match self.data.peer(&peer_id) {
// If we're already connected, don't answer, as the docs mention.
......@@ -329,6 +343,11 @@ impl Peerset {
peersstate::Peer::Unknown(entry) => entry.discover(),
};
if not_connected.reputation() < BANNED_THRESHOLD {
self.message_queue.push_back(Message::Reject(index));
return
}
match not_connected.try_accept_incoming() {
Ok(_) => self.message_queue.push_back(Message::Accept(index)),
Err(_) => self.message_queue.push_back(Message::Reject(index)),
......@@ -430,7 +449,8 @@ impl Stream for Peerset {
mod tests {
use libp2p::PeerId;
use futures::prelude::*;
use super::{PeersetConfig, Peerset, Message, IncomingIndex};
use super::{PeersetConfig, Peerset, Message, IncomingIndex, BANNED_THRESHOLD};
use std::{thread, time::Duration};
fn assert_messages(mut peerset: Peerset, messages: Vec<Message>) -> Peerset {
for expected_message in messages {
......@@ -528,4 +548,45 @@ mod tests {
Message::Connect(discovered),
]);
}
#[test]
fn test_peerset_banned() {
let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
in_peers: 25,
out_peers: 25,
bootnodes: vec![],
reserved_only: false,
reserved_nodes: vec![],
});
// We ban a node by setting its reputation under the threshold.
let peer_id = PeerId::random();
handle.report_peer(peer_id.clone(), BANNED_THRESHOLD - 1);
let fut = futures::future::poll_fn(move || -> Result<_, ()> {
// We need one polling for the message to be processed.
assert_eq!(peerset.poll().unwrap(), Async::NotReady);
// Check that an incoming connection from that node gets refused.
peerset.incoming(peer_id.clone(), IncomingIndex(1));
if let Async::Ready(msg) = peerset.poll().unwrap() {
assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
} else {
panic!()
}
// Wait a bit for the node's reputation to go above the threshold.
thread::sleep(Duration::from_millis(1500));
// Try again. This time the node should be accepted.
peerset.incoming(peer_id.clone(), IncomingIndex(2));
while let Async::Ready(msg) = peerset.poll().unwrap() {
assert_eq!(msg.unwrap(), Message::Accept(IncomingIndex(2)));
}
Ok(Async::Ready(()))
});
tokio::runtime::current_thread::Runtime::new().unwrap().block_on(fut).unwrap();
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment