Skip to content
Snippets Groups Projects
Commit 7885068f authored by Arkadiy Paronyan's avatar Arkadiy Paronyan Committed by Gavin Wood
Browse files

Peer prioritization (#2717)

* Added priority groups

* Added a test

* Whitespace

* Added add/remove single peer API

* Added a warning

* Fixed removing reserved peer

* Fixed build

* Made some methods private and made get_priority_group return an Option
parent fff90e86
No related merge requests found
......@@ -19,7 +19,7 @@
mod peersstate;
use std::{collections::HashMap, collections::VecDeque, time::Instant};
use std::{collections::{HashSet, HashMap}, collections::VecDeque, time::Instant};
use futures::{prelude::*, sync::mpsc, try_ready};
use libp2p::PeerId;
use log::{debug, error, trace};
......@@ -29,6 +29,8 @@ use serde_json::json;
const BANNED_THRESHOLD: i32 = 82 * (i32::min_value() / 100);
/// Reputation change for a node when we get disconnected from it.
const DISCONNECT_REPUTATION_CHANGE: i32 = -10;
/// Reserved peers group ID
const RESERVED_NODES: &'static str = "reserved";
#[derive(Debug)]
enum Action {
......@@ -36,6 +38,9 @@ enum Action {
RemoveReservedPeer(PeerId),
SetReservedOnly(bool),
ReportPeer(PeerId, i32),
SetPriorityGroup(String, HashSet<PeerId>),
AddToPriorityGroup(String, PeerId),
RemoveFromPriorityGroup(String, PeerId),
}
/// Shared handle to the peer set manager (PSM). Distributed around the code.
......@@ -72,6 +77,21 @@ impl PeersetHandle {
pub fn report_peer(&self, peer_id: PeerId, score_diff: i32) {
let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
}
/// Modify a priority group.
pub fn set_priority_group(&self, group_id: String, peers: HashSet<PeerId>) {
let _ = self.tx.unbounded_send(Action::SetPriorityGroup(group_id, peers));
}
/// Add a peer to a priority group.
pub fn add_to_priority_group(&self, group_id: String, peer_id: PeerId) {
let _ = self.tx.unbounded_send(Action::AddToPriorityGroup(group_id, peer_id));
}
/// Remove a peer from a priority group.
pub fn remove_from_priority_group(&self, group_id: String, peer_id: PeerId) {
let _ = self.tx.unbounded_send(Action::RemoveFromPriorityGroup(group_id, peer_id));
}
}
/// Message that can be sent by the peer set manager (PSM).
......@@ -161,14 +181,7 @@ impl Peerset {
latest_time_update: Instant::now(),
};
for peer_id in config.reserved_nodes {
if let peersstate::Peer::Unknown(entry) = peerset.data.peer(&peer_id) {
entry.discover().set_reserved(true);
} else {
debug!(target: "peerset", "Duplicate reserved node in config: {:?}", peer_id);
}
}
peerset.data.set_priority_group(RESERVED_NODES, config.reserved_nodes.into_iter().collect());
for peer_id in config.bootnodes {
if let peersstate::Peer::Unknown(entry) = peerset.data.peer(&peer_id) {
entry.discover();
......@@ -182,32 +195,25 @@ impl Peerset {
}
fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
let mut entry = match self.data.peer(&peer_id) {
peersstate::Peer::Connected(mut connected) => {
connected.set_reserved(true);
return
}
peersstate::Peer::NotConnected(entry) => entry,
peersstate::Peer::Unknown(entry) => entry.discover(),
};
// We reach this point if and only if we were not connected to the node.
entry.set_reserved(true);
entry.force_outgoing();
self.message_queue.push_back(Message::Connect(peer_id));
let mut reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default();
reserved.insert(peer_id);
self.data.set_priority_group(RESERVED_NODES, reserved);
self.alloc_slots();
}
fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
let mut reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default();
reserved.remove(&peer_id);
self.data.set_priority_group(RESERVED_NODES, reserved);
match self.data.peer(&peer_id) {
peersstate::Peer::Connected(mut peer) => {
peer.set_reserved(false);
peersstate::Peer::Connected(peer) => {
if self.reserved_only {
peer.disconnect();
self.message_queue.push_back(Message::Drop(peer_id));
}
}
peersstate::Peer::NotConnected(mut peer) => peer.set_reserved(false),
peersstate::Peer::Unknown(_) => {}
peersstate::Peer::NotConnected(_) => {},
peersstate::Peer::Unknown(_) => {},
}
}
......@@ -215,20 +221,35 @@ impl Peerset {
// Disconnect non-reserved nodes.
self.reserved_only = reserved_only;
if self.reserved_only {
let reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default();
for peer_id in self.data.connected_peers().cloned().collect::<Vec<_>>().into_iter() {
let peer = self.data.peer(&peer_id).into_connected()
.expect("We are enumerating connected peers, therefore the peer is connected; qed");
if !peer.is_reserved() {
if !reserved.contains(&peer_id) {
peer.disconnect();
self.message_queue.push_back(Message::Drop(peer_id));
}
}
} else {
self.alloc_slots();
}
}
fn on_set_priority_group(&mut self, group_id: &str, peers: HashSet<PeerId>) {
self.data.set_priority_group(group_id, peers);
self.alloc_slots();
}
fn on_add_to_priority_group(&mut self, group_id: &str, peer_id: PeerId) {
self.data.add_to_priority_group(group_id, peer_id);
self.alloc_slots();
}
fn on_remove_from_priority_group(&mut self, group_id: &str, peer_id: PeerId) {
self.data.remove_from_priority_group(group_id, &peer_id);
self.alloc_slots();
}
fn on_report_peer(&mut self, peer_id: PeerId, score_diff: i32) {
// We want reputations to be up-to-date before adjusting them.
self.update_time();
......@@ -292,7 +313,13 @@ impl Peerset {
self.update_time();
// Try to grab the next node to attempt to connect to.
while let Some(next) = self.data.reserved_not_connected_peer() {
while let Some(next) = {
if self.reserved_only {
self.data.priority_not_connected_peer_from_group(RESERVED_NODES)
} else {
self.data.priority_not_connected_peer()
}
} {
match next.try_outgoing() {
Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())),
Err(_) => break, // No more slots available.
......@@ -421,6 +448,11 @@ impl Peerset {
"message_queue": self.message_queue.len(),
})
}
/// Returns priority group by id.
pub fn get_priority_group(&self, group_id: &str) -> Option<HashSet<PeerId>> {
self.data.get_priority_group(group_id)
}
}
impl Stream for Peerset {
......@@ -439,6 +471,9 @@ impl Stream for Peerset {
Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
Action::SetReservedOnly(reserved) => self.on_set_reserved_only(reserved),
Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff),
Action::SetPriorityGroup(group_id, peers) => self.on_set_priority_group(&group_id, peers),
Action::AddToPriorityGroup(group_id, peer_id) => self.on_add_to_priority_group(&group_id, peer_id),
Action::RemoveFromPriorityGroup(group_id, peer_id) => self.on_remove_from_priority_group(&group_id, peer_id),
}
}
}
......@@ -590,3 +625,4 @@ mod tests {
tokio::runtime::current_thread::Runtime::new().unwrap().block_on(fut).unwrap();
}
}
This diff is collapsed.
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment