From 7885068facd0feec08e8e146567d7eee7f98819f Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan <arkady.paronyan@gmail.com> Date: Mon, 3 Jun 2019 16:45:12 +0200 Subject: [PATCH] Peer prioritization (#2717) * Added priority groups * Added a test * Whitespace * Added add/remove single peer API * Added a warning * Fixed removing reserved peer * Fixed build * Made some methods private and made get_priority_group return an Option --- substrate/core/peerset/src/lib.rs | 94 +++-- substrate/core/peerset/src/peersstate.rs | 505 ++++++++++++----------- 2 files changed, 330 insertions(+), 269 deletions(-) diff --git a/substrate/core/peerset/src/lib.rs b/substrate/core/peerset/src/lib.rs index 6956cd16e11..aa3ce02076d 100644 --- a/substrate/core/peerset/src/lib.rs +++ b/substrate/core/peerset/src/lib.rs @@ -19,7 +19,7 @@ mod peersstate; -use std::{collections::HashMap, collections::VecDeque, time::Instant}; +use std::{collections::{HashSet, HashMap}, collections::VecDeque, time::Instant}; use futures::{prelude::*, sync::mpsc, try_ready}; use libp2p::PeerId; use log::{debug, error, trace}; @@ -29,6 +29,8 @@ use serde_json::json; const BANNED_THRESHOLD: i32 = 82 * (i32::min_value() / 100); /// Reputation change for a node when we get disconnected from it. const DISCONNECT_REPUTATION_CHANGE: i32 = -10; +/// Reserved peers group ID +const RESERVED_NODES: &'static str = "reserved"; #[derive(Debug)] enum Action { @@ -36,6 +38,9 @@ enum Action { RemoveReservedPeer(PeerId), SetReservedOnly(bool), ReportPeer(PeerId, i32), + SetPriorityGroup(String, HashSet<PeerId>), + AddToPriorityGroup(String, PeerId), + RemoveFromPriorityGroup(String, PeerId), } /// Shared handle to the peer set manager (PSM). Distributed around the code. @@ -72,6 +77,21 @@ impl PeersetHandle { pub fn report_peer(&self, peer_id: PeerId, score_diff: i32) { let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); } + + /// Modify a priority group. + pub fn set_priority_group(&self, group_id: String, peers: HashSet<PeerId>) { + let _ = self.tx.unbounded_send(Action::SetPriorityGroup(group_id, peers)); + } + + /// Add a peer to a priority group. + pub fn add_to_priority_group(&self, group_id: String, peer_id: PeerId) { + let _ = self.tx.unbounded_send(Action::AddToPriorityGroup(group_id, peer_id)); + } + + /// Remove a peer from a priority group. + pub fn remove_from_priority_group(&self, group_id: String, peer_id: PeerId) { + let _ = self.tx.unbounded_send(Action::RemoveFromPriorityGroup(group_id, peer_id)); + } } /// Message that can be sent by the peer set manager (PSM). @@ -161,14 +181,7 @@ impl Peerset { latest_time_update: Instant::now(), }; - for peer_id in config.reserved_nodes { - if let peersstate::Peer::Unknown(entry) = peerset.data.peer(&peer_id) { - entry.discover().set_reserved(true); - } else { - debug!(target: "peerset", "Duplicate reserved node in config: {:?}", peer_id); - } - } - + peerset.data.set_priority_group(RESERVED_NODES, config.reserved_nodes.into_iter().collect()); for peer_id in config.bootnodes { if let peersstate::Peer::Unknown(entry) = peerset.data.peer(&peer_id) { entry.discover(); @@ -182,32 +195,25 @@ impl Peerset { } fn on_add_reserved_peer(&mut self, peer_id: PeerId) { - let mut entry = match self.data.peer(&peer_id) { - peersstate::Peer::Connected(mut connected) => { - connected.set_reserved(true); - return - } - peersstate::Peer::NotConnected(entry) => entry, - peersstate::Peer::Unknown(entry) => entry.discover(), - }; - - // We reach this point if and only if we were not connected to the node. - entry.set_reserved(true); - entry.force_outgoing(); - self.message_queue.push_back(Message::Connect(peer_id)); + let mut reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default(); + reserved.insert(peer_id); + self.data.set_priority_group(RESERVED_NODES, reserved); + self.alloc_slots(); } fn on_remove_reserved_peer(&mut self, peer_id: PeerId) { + let mut reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default(); + reserved.remove(&peer_id); + self.data.set_priority_group(RESERVED_NODES, reserved); match self.data.peer(&peer_id) { - peersstate::Peer::Connected(mut peer) => { - peer.set_reserved(false); + peersstate::Peer::Connected(peer) => { if self.reserved_only { peer.disconnect(); self.message_queue.push_back(Message::Drop(peer_id)); } } - peersstate::Peer::NotConnected(mut peer) => peer.set_reserved(false), - peersstate::Peer::Unknown(_) => {} + peersstate::Peer::NotConnected(_) => {}, + peersstate::Peer::Unknown(_) => {}, } } @@ -215,20 +221,35 @@ impl Peerset { // Disconnect non-reserved nodes. self.reserved_only = reserved_only; if self.reserved_only { + let reserved = self.data.get_priority_group(RESERVED_NODES).unwrap_or_default(); for peer_id in self.data.connected_peers().cloned().collect::<Vec<_>>().into_iter() { let peer = self.data.peer(&peer_id).into_connected() .expect("We are enumerating connected peers, therefore the peer is connected; qed"); - if !peer.is_reserved() { + if !reserved.contains(&peer_id) { peer.disconnect(); self.message_queue.push_back(Message::Drop(peer_id)); } } - } else { self.alloc_slots(); } } + fn on_set_priority_group(&mut self, group_id: &str, peers: HashSet<PeerId>) { + self.data.set_priority_group(group_id, peers); + self.alloc_slots(); + } + + fn on_add_to_priority_group(&mut self, group_id: &str, peer_id: PeerId) { + self.data.add_to_priority_group(group_id, peer_id); + self.alloc_slots(); + } + + fn on_remove_from_priority_group(&mut self, group_id: &str, peer_id: PeerId) { + self.data.remove_from_priority_group(group_id, &peer_id); + self.alloc_slots(); + } + fn on_report_peer(&mut self, peer_id: PeerId, score_diff: i32) { // We want reputations to be up-to-date before adjusting them. self.update_time(); @@ -292,7 +313,13 @@ impl Peerset { self.update_time(); // Try to grab the next node to attempt to connect to. - while let Some(next) = self.data.reserved_not_connected_peer() { + while let Some(next) = { + if self.reserved_only { + self.data.priority_not_connected_peer_from_group(RESERVED_NODES) + } else { + self.data.priority_not_connected_peer() + } + } { match next.try_outgoing() { Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())), Err(_) => break, // No more slots available. @@ -421,6 +448,11 @@ impl Peerset { "message_queue": self.message_queue.len(), }) } + + /// Returns priority group by id. + pub fn get_priority_group(&self, group_id: &str) -> Option<HashSet<PeerId>> { + self.data.get_priority_group(group_id) + } } impl Stream for Peerset { @@ -439,6 +471,9 @@ impl Stream for Peerset { Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id), Action::SetReservedOnly(reserved) => self.on_set_reserved_only(reserved), Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff), + Action::SetPriorityGroup(group_id, peers) => self.on_set_priority_group(&group_id, peers), + Action::AddToPriorityGroup(group_id, peer_id) => self.on_add_to_priority_group(&group_id, peer_id), + Action::RemoveFromPriorityGroup(group_id, peer_id) => self.on_remove_from_priority_group(&group_id, peer_id), } } } @@ -590,3 +625,4 @@ mod tests { tokio::runtime::current_thread::Runtime::new().unwrap().block_on(fut).unwrap(); } } + diff --git a/substrate/core/peerset/src/peersstate.rs b/substrate/core/peerset/src/peersstate.rs index a6a375b3694..e02d6304046 100644 --- a/substrate/core/peerset/src/peersstate.rs +++ b/substrate/core/peerset/src/peersstate.rs @@ -17,7 +17,8 @@ //! Contains the state storage behind the peerset. use libp2p::PeerId; -use std::{borrow::Cow, collections::HashMap}; +use std::{borrow::Cow, collections::{HashSet, HashMap}}; +use log::warn; /// State storage behind the peerset. /// @@ -35,17 +36,20 @@ pub struct PeersState { /// sort, to make the logic easier. nodes: HashMap<PeerId, Node>, - /// Number of non-reserved nodes for which the `ConnectionState` is `In`. + /// Number of non-priority nodes for which the `ConnectionState` is `In`. num_in: u32, - /// Number of non-reserved nodes for which the `ConnectionState` is `In`. + /// Number of non-priority nodes for which the `ConnectionState` is `In`. num_out: u32, - /// Maximum allowed number of non-reserved nodes for which the `ConnectionState` is `In`. + /// Maximum allowed number of non-priority nodes for which the `ConnectionState` is `In`. max_in: u32, - /// Maximum allowed number of non-reserved nodes for which the `ConnectionState` is `Out`. + /// Maximum allowed number of non-priority nodes for which the `ConnectionState` is `Out`. max_out: u32, + + /// Priority groups. Each group is identified by a string ID and contains a set of peer IDs. + priority_nodes: HashMap<String, HashSet<PeerId>>, } /// State of a single node that we know about. @@ -54,14 +58,20 @@ struct Node { /// Whether we are connected to this node. connection_state: ConnectionState, - /// If true, this node is reserved and should always be connected to. - reserved: bool, - /// Reputation value of the node, between `i32::min_value` (we hate that node) and /// `i32::max_value` (we love that node). reputation: i32, } +impl Default for Node { + fn default() -> Node { + Node { + connection_state: ConnectionState::NotConnected, + reputation: 0, + } + } +} + /// Whether we are connected to a node. #[derive(Debug, Copy, Clone, PartialEq, Eq)] enum ConnectionState { @@ -93,42 +103,30 @@ impl PeersState { num_out: 0, max_in: in_peers, max_out: out_peers, + priority_nodes: HashMap::new(), } } /// Returns an object that grants access to the state of a peer. pub fn peer<'a>(&'a mut self, peer_id: &'a PeerId) -> Peer<'a> { - // Note: the Rust borrow checker still has some issues. In particular, we can't put this - // block as an `else` below (as the obvious solution would be here), or it will complain - // that we borrow `self` while it is already borrowed. - if !self.nodes.contains_key(peer_id) { - return Peer::Unknown(UnknownPeer { + match self.nodes.get_mut(peer_id) { + None => return Peer::Unknown(UnknownPeer { parent: self, peer_id: Cow::Borrowed(peer_id), - }); - } - - let state = self.nodes.get_mut(peer_id) - .expect("We check that the value is present right above; QED"); - - if state.connection_state.is_connected() { - Peer::Connected(ConnectedPeer { - state, - peer_id: Cow::Borrowed(peer_id), - num_in: &mut self.num_in, - num_out: &mut self.num_out, - max_in: self.max_in, - max_out: self.max_out, - }) - } else { - Peer::NotConnected(NotConnectedPeer { - state, - peer_id: Cow::Borrowed(peer_id), - num_in: &mut self.num_in, - num_out: &mut self.num_out, - max_in: self.max_in, - max_out: self.max_out, - }) + }), + Some(peer) => { + if peer.connection_state.is_connected() { + Peer::Connected(ConnectedPeer { + state: self, + peer_id: Cow::Borrowed(peer_id), + }) + } else { + Peer::NotConnected(NotConnectedPeer { + state: self, + peer_id: Cow::Borrowed(peer_id), + }) + } + } } } @@ -148,28 +146,32 @@ impl PeersState { .map(|(p, _)| p) } - /// Returns the first reserved peer that we are not connected to. + /// Returns the first priority peer that we are not connected to. /// - /// If multiple nodes are reserved, which one is returned is unspecified. - pub fn reserved_not_connected_peer(&mut self) -> Option<NotConnectedPeer> { - let outcome = self.nodes.iter_mut() - .find(|(_, &mut Node { connection_state, reserved, .. })| { - reserved && !connection_state.is_connected() - }) - .map(|(peer_id, node)| (peer_id.clone(), node)); - - if let Some((peer_id, state)) = outcome { - Some(NotConnectedPeer { - state, - peer_id: Cow::Owned(peer_id), - num_in: &mut self.num_in, - num_out: &mut self.num_out, - max_in: self.max_in, - max_out: self.max_out, - }) - } else { - None - } + /// If multiple nodes are prioritized, which one is returned is unspecified. + pub fn priority_not_connected_peer(&mut self) -> Option<NotConnectedPeer> { + let id = self.priority_nodes.values() + .flatten() + .find(|id| self.nodes.get(id).map_or(false, |node| !node.connection_state.is_connected())) + .cloned(); + id.map(move |id| NotConnectedPeer { + state: self, + peer_id: Cow::Owned(id), + }) + } + + /// Returns the first priority peer that we are not connected to. + /// + /// If multiple nodes are prioritized, which one is returned is unspecified. + pub fn priority_not_connected_peer_from_group(&mut self, group_id: &str) -> Option<NotConnectedPeer> { + let id = self.priority_nodes.get(group_id) + .and_then(|group| group.iter() + .find(|id| self.nodes.get(id).map_or(false, |node| !node.connection_state.is_connected())) + .cloned()); + id.map(move |id| NotConnectedPeer { + state: self, + peer_id: Cow::Owned(id), + }) } /// Returns the peer with the highest reputation and that we are not connected to. @@ -187,21 +189,160 @@ impl PeersState { } Some(to_try) }) - .map(|(peer_id, state)| (peer_id.clone(), state)); + .map(|(peer_id, _)| peer_id.clone()); - if let Some((peer_id, state)) = outcome { + if let Some(peer_id) = outcome { Some(NotConnectedPeer { - state, + state: self, peer_id: Cow::Owned(peer_id), - num_in: &mut self.num_in, - num_out: &mut self.num_out, - max_in: self.max_in, - max_out: self.max_out, }) } else { None } } + + fn disconnect(&mut self, peer_id: &PeerId) { + let is_priority = self.is_priority(peer_id); + if let Some(mut node) = self.nodes.get_mut(peer_id) { + if !is_priority { + match node.connection_state { + ConnectionState::In => self.num_in -= 1, + ConnectionState::Out => self.num_out -= 1, + ConnectionState::NotConnected => + debug_assert!(false, "State inconsistency: disconnecting a disconnected node") + } + } + node.connection_state = ConnectionState::NotConnected; + } else { + warn!(target: "peerset", "Attempting to disconnect unknown peer {}", peer_id); + } + } + + /// Sets the peer as connected with an outgoing connection. + fn try_outgoing(&mut self, peer_id: &PeerId) -> bool { + // Note that it is possible for num_out to be strictly superior to the max, in case we were + // connected to reserved node then marked them as not reserved. + let is_priority = self.is_priority(peer_id); + if self.num_out >= self.max_out && !is_priority { + return false; + } + + if let Some(mut peer) = self.nodes.get_mut(peer_id) { + peer.connection_state = ConnectionState::Out; + if !is_priority { + self.num_out += 1; + } + return true; + } + false + } + + /// Tries to accept the peer as an incoming connection. + /// + /// If there are enough slots available, switches the node to "connected" and returns `Ok`. If + /// the slots are full, the node stays "not connected" and we return `Err`. + /// + /// Note that reserved nodes don't count towards the number of slots. + fn try_accept_incoming(&mut self, peer_id: &PeerId) -> bool { + let is_priority = self.is_priority(peer_id); + // Note that it is possible for num_in to be strictly superior to the max, in case we were + // connected to reserved node then marked them as not reserved. + if self.num_in >= self.max_in && !is_priority { + return false; + } + if let Some(mut peer) = self.nodes.get_mut(peer_id) { + peer.connection_state = ConnectionState::In; + if !is_priority { + self.num_in += 1; + } + return true; + } + false + } + + /// Sets priority group + pub fn set_priority_group(&mut self, group_id: &str, peers: HashSet<PeerId>) { + // update slot counters + let all_other_groups: HashSet<_> = self.priority_nodes + .iter() + .filter(|(g, _)| *g != group_id) + .flat_map(|(_, id)| id.clone()) + .collect(); + let existing_group = self.priority_nodes.remove(group_id).unwrap_or_default(); + for id in existing_group { + // update slots for nodes that are no longer priority + if !all_other_groups.contains(&id) { + if let Some(peer) = self.nodes.get_mut(&id) { + match peer.connection_state { + ConnectionState::In => self.num_in += 1, + ConnectionState::Out => self.num_out += 1, + ConnectionState::NotConnected => {}, + } + } + } + } + + for id in &peers { + // update slots for nodes that become priority + if !all_other_groups.contains(&id) { + let peer = self.nodes.entry(id.clone()).or_default(); + match peer.connection_state { + ConnectionState::In => self.num_in -= 1, + ConnectionState::Out => self.num_out -= 1, + ConnectionState::NotConnected => {}, + } + } + } + self.priority_nodes.insert(group_id.into(), peers); + } + + /// Add a peer to a priority group. + pub fn add_to_priority_group(&mut self, group_id: &str, peer_id: PeerId) { + let mut peers = self.priority_nodes.get(group_id).cloned().unwrap_or_default(); + peers.insert(peer_id); + self.set_priority_group(group_id, peers); + } + + /// Remove a peer from a priority group. + pub fn remove_from_priority_group(&mut self, group_id: &str, peer_id: &PeerId) { + let mut peers = self.priority_nodes.get(group_id).cloned().unwrap_or_default(); + peers.remove(&peer_id); + self.set_priority_group(group_id, peers); + } + + /// Get priority group content. + pub fn get_priority_group(&self, group_id: &str) -> Option<HashSet<PeerId>> { + self.priority_nodes.get(group_id).cloned() + } + + /// Check that node is any priority group. + fn is_priority(&self, peer_id: &PeerId) -> bool { + self.priority_nodes.iter().any(|(_, group)| group.contains(peer_id)) + } + + /// Returns the reputation value of the node. + fn reputation(&self, peer_id: &PeerId) -> i32 { + self.nodes.get(peer_id).map_or(0, |p| p.reputation) + } + + /// Sets the reputation of the peer. + fn set_reputation(&mut self, peer_id: &PeerId, value: i32) { + let node = self.nodes + .entry(peer_id.clone()) + .or_default(); + node.reputation = value; + } + + /// Performs an arithmetic addition on the reputation score of that peer. + /// + /// In case of overflow, the value will be capped. + /// If the peer is unknown to us, we insert it and consider that it has a reputation of 0. + fn add_reputation(&mut self, peer_id: &PeerId, modifier: i32) { + let node = self.nodes + .entry(peer_id.clone()) + .or_default(); + node.reputation = node.reputation.saturating_add(modifier); + } } /// Grants access to the state of a peer in the `PeersState`. @@ -250,12 +391,8 @@ impl<'a> Peer<'a> { /// A peer that is connected to us. pub struct ConnectedPeer<'a> { - state: &'a mut Node, + state: &'a mut PeersState, peer_id: Cow<'a, PeerId>, - num_in: &'a mut u32, - num_out: &'a mut u32, - max_in: u32, - max_out: u32, } impl<'a> ConnectedPeer<'a> { @@ -266,87 +403,36 @@ impl<'a> ConnectedPeer<'a> { /// Switches the peer to "not connected". pub fn disconnect(self) -> NotConnectedPeer<'a> { - let connec_state = &mut self.state.connection_state; - - if !self.state.reserved { - match *connec_state { - ConnectionState::In => *self.num_in -= 1, - ConnectionState::Out => *self.num_out -= 1, - ConnectionState::NotConnected => - debug_assert!(false, "State inconsistency: disconnecting a disconnected node") - } - } - - *connec_state = ConnectionState::NotConnected; - + self.state.disconnect(&self.peer_id); NotConnectedPeer { state: self.state, peer_id: self.peer_id, - num_in: self.num_in, - num_out: self.num_out, - max_in: self.max_in, - max_out: self.max_out, - } - } - - /// Sets whether or not the node is reserved. - pub fn set_reserved(&mut self, reserved: bool) { - if reserved == self.state.reserved { - return; } - - if reserved { - self.state.reserved = true; - match self.state.connection_state { - ConnectionState::In => *self.num_in -= 1, - ConnectionState::Out => *self.num_out -= 1, - ConnectionState::NotConnected => debug_assert!(false, "State inconsistency: \ - connected node is in fact not connected"), - } - - } else { - self.state.reserved = false; - match self.state.connection_state { - ConnectionState::In => *self.num_in += 1, - ConnectionState::Out => *self.num_out += 1, - ConnectionState::NotConnected => debug_assert!(false, "State inconsistency: \ - connected node is in fact not connected"), - } - } - } - - /// Returns whether or not the node is reserved. - pub fn is_reserved(&self) -> bool { - self.state.reserved } /// Returns the reputation value of the node. pub fn reputation(&self) -> i32 { - self.state.reputation + self.state.reputation(&self.peer_id) } /// Sets the reputation of the peer. pub fn set_reputation(&mut self, value: i32) { - self.state.reputation = value; + self.state.set_reputation(&self.peer_id, value) } /// Performs an arithmetic addition on the reputation score of that peer. /// /// In case of overflow, the value will be capped. pub fn add_reputation(&mut self, modifier: i32) { - let reputation = &mut self.state.reputation; - *reputation = reputation.saturating_add(modifier); + self.state.add_reputation(&self.peer_id, modifier) } } /// A peer that is not connected to us. +#[derive(Debug)] pub struct NotConnectedPeer<'a> { - state: &'a mut Node, + state: &'a mut PeersState, peer_id: Cow<'a, PeerId>, - num_in: &'a mut u32, - num_out: &'a mut u32, - max_in: u32, - max_out: u32, } impl<'a> NotConnectedPeer<'a> { @@ -360,41 +446,16 @@ impl<'a> NotConnectedPeer<'a> { /// /// If there are enough slots available, switches the node to "connected" and returns `Ok`. If /// the slots are full, the node stays "not connected" and we return `Err`. - /// If the node is reserved, this method always succeeds. /// - /// Note that reserved nodes don't count towards the number of slots. + /// Note that priority nodes don't count towards the number of slots. pub fn try_outgoing(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> { - if self.is_reserved() { - return Ok(self.force_outgoing()) - } - - // Note that it is possible for num_out to be strictly superior to the max, in case we were - // connected to reserved node then marked them as not reserved, or if the user used - // `force_outgoing`. - if *self.num_out >= self.max_out { - return Err(self); - } - - Ok(self.force_outgoing()) - } - - /// Sets the peer as connected as an outgoing connection. - pub fn force_outgoing(self) -> ConnectedPeer<'a> { - let connec_state = &mut self.state.connection_state; - debug_assert!(!connec_state.is_connected()); - *connec_state = ConnectionState::Out; - - if !self.state.reserved { - *self.num_out += 1; - } - - ConnectedPeer { - state: self.state, - peer_id: self.peer_id, - num_in: self.num_in, - num_out: self.num_out, - max_in: self.max_in, - max_out: self.max_out, + if self.state.try_outgoing(&self.peer_id) { + Ok(ConnectedPeer { + state: self.state, + peer_id: self.peer_id, + }) + } else { + Err(self) } } @@ -403,59 +464,26 @@ impl<'a> NotConnectedPeer<'a> { /// If there are enough slots available, switches the node to "connected" and returns `Ok`. If /// the slots are full, the node stays "not connected" and we return `Err`. /// - /// Note that reserved nodes don't count towards the number of slots. + /// Note that priority nodes don't count towards the number of slots. pub fn try_accept_incoming(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> { - if self.is_reserved() { - return Ok(self.force_ingoing()) - } - - // Note that it is possible for num_in to be strictly superior to the max, in case we were - // connected to reserved node then marked them as not reserved. - if *self.num_in >= self.max_in { - return Err(self); - } - - Ok(self.force_ingoing()) - } - - /// Sets the peer as connected as an ingoing connection. - pub fn force_ingoing(self) -> ConnectedPeer<'a> { - let connec_state = &mut self.state.connection_state; - debug_assert!(!connec_state.is_connected()); - *connec_state = ConnectionState::In; - - if !self.state.reserved { - *self.num_in += 1; - } - - ConnectedPeer { - state: self.state, - peer_id: self.peer_id, - num_in: self.num_in, - num_out: self.num_out, - max_in: self.max_in, - max_out: self.max_out, + if self.state.try_accept_incoming(&self.peer_id) { + Ok(ConnectedPeer { + state: self.state, + peer_id: self.peer_id, + }) + } else { + Err(self) } } - /// Sets whether or not the node is reserved. - pub fn set_reserved(&mut self, reserved: bool) { - self.state.reserved = reserved; - } - - /// Returns true if the the node is reserved. - pub fn is_reserved(&self) -> bool { - self.state.reserved - } - /// Returns the reputation value of the node. pub fn reputation(&self) -> i32 { - self.state.reputation + self.state.reputation(&self.peer_id) } /// Sets the reputation of the peer. pub fn set_reputation(&mut self, value: i32) { - self.state.reputation = value; + self.state.set_reputation(&self.peer_id, value) } /// Performs an arithmetic addition on the reputation score of that peer. @@ -463,8 +491,7 @@ impl<'a> NotConnectedPeer<'a> { /// In case of overflow, the value will be capped. /// If the peer is unknown to us, we insert it and consider that it has a reputation of 0. pub fn add_reputation(&mut self, modifier: i32) { - let reputation = &mut self.state.reputation; - *reputation = reputation.saturating_add(modifier); + self.state.add_reputation(&self.peer_id, modifier) } } @@ -477,25 +504,18 @@ pub struct UnknownPeer<'a> { impl<'a> UnknownPeer<'a> { /// Inserts the peer identity in our list. /// - /// The node is not reserved and starts with a reputation of 0. You can adjust these default + /// The node starts with a reputation of 0. You can adjust these default /// values using the `NotConnectedPeer` that this method returns. pub fn discover(self) -> NotConnectedPeer<'a> { self.parent.nodes.insert(self.peer_id.clone().into_owned(), Node { connection_state: ConnectionState::NotConnected, reputation: 0, - reserved: false, }); - let state = self.parent.nodes.get_mut(&self.peer_id) - .expect("We insert that key into the HashMap right above; QED"); - + let state = self.parent; NotConnectedPeer { state, peer_id: self.peer_id, - num_in: &mut self.parent.num_in, - num_out: &mut self.parent.num_out, - max_in: self.parent.max_in, - max_out: self.parent.max_out, } } } @@ -521,14 +541,13 @@ mod tests { } #[test] - fn reserved_node_doesnt_use_slot() { + fn priority_node_doesnt_use_slot() { let mut peers_state = PeersState::new(1, 1); let id1 = PeerId::random(); let id2 = PeerId::random(); - if let Peer::Unknown(e) = peers_state.peer(&id1) { - let mut p = e.discover(); - p.set_reserved(true); + peers_state.set_priority_group("test", vec![id1.clone()].into_iter().collect()); + if let Peer::NotConnected(p) = peers_state.peer(&id1) { assert!(p.try_accept_incoming().is_ok()); } else { panic!() } @@ -550,23 +569,22 @@ mod tests { } #[test] - fn reserved_not_connected_peer() { + fn priority_not_connected_peer() { let mut peers_state = PeersState::new(25, 25); let id1 = PeerId::random(); let id2 = PeerId::random(); - assert!(peers_state.reserved_not_connected_peer().is_none()); + assert!(peers_state.priority_not_connected_peer().is_none()); peers_state.peer(&id1).into_unknown().unwrap().discover(); peers_state.peer(&id2).into_unknown().unwrap().discover(); - assert!(peers_state.reserved_not_connected_peer().is_none()); - peers_state.peer(&id1).into_not_connected().unwrap().set_reserved(true); - assert!(peers_state.reserved_not_connected_peer().is_some()); - peers_state.peer(&id2).into_not_connected().unwrap().set_reserved(true); - peers_state.peer(&id1).into_not_connected().unwrap().set_reserved(false); - assert!(peers_state.reserved_not_connected_peer().is_some()); - peers_state.peer(&id2).into_not_connected().unwrap().set_reserved(false); - assert!(peers_state.reserved_not_connected_peer().is_none()); + assert!(peers_state.priority_not_connected_peer().is_none()); + peers_state.set_priority_group("test", vec![id1.clone()].into_iter().collect()); + assert!(peers_state.priority_not_connected_peer().is_some()); + peers_state.set_priority_group("test", vec![id2.clone(), id2.clone()].into_iter().collect()); + assert!(peers_state.priority_not_connected_peer().is_some()); + peers_state.set_priority_group("test", vec![].into_iter().collect()); + assert!(peers_state.priority_not_connected_peer().is_none()); } #[test] @@ -581,7 +599,7 @@ mod tests { assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone())); peers_state.peer(&id2).into_not_connected().unwrap().set_reputation(75); assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id2.clone())); - peers_state.peer(&id2).into_not_connected().unwrap().force_ingoing(); + peers_state.peer(&id2).into_not_connected().unwrap().try_accept_incoming().unwrap(); assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone())); peers_state.peer(&id1).into_not_connected().unwrap().set_reputation(100); peers_state.peer(&id2).into_connected().unwrap().disconnect(); @@ -591,24 +609,31 @@ mod tests { } #[test] - fn disconnect_reserved_doesnt_panic() { + fn disconnect_priority_doesnt_panic() { let mut peers_state = PeersState::new(1, 1); let id = PeerId::random(); - let mut peer = peers_state.peer(&id).into_unknown().unwrap().discover() - .force_outgoing(); - peer.set_reserved(true); + peers_state.set_priority_group("test", vec![id.clone()].into_iter().collect()); + let peer = peers_state.peer(&id).into_not_connected().unwrap().try_outgoing().unwrap(); peer.disconnect(); } #[test] - fn multiple_set_reserved_calls_doesnt_panic() { + fn multiple_priority_groups_slot_count() { let mut peers_state = PeersState::new(1, 1); let id = PeerId::random(); - let mut peer = peers_state.peer(&id) - .into_unknown().unwrap().discover() - .force_outgoing(); - peer.set_reserved(true); - peer.set_reserved(true); - peer.disconnect(); + + if let Peer::Unknown(p) = peers_state.peer(&id) { + assert!(p.discover().try_accept_incoming().is_ok()); + } else { panic!() } + + assert_eq!(peers_state.num_in, 1); + peers_state.set_priority_group("test1", vec![id.clone()].into_iter().collect()); + assert_eq!(peers_state.num_in, 0); + peers_state.set_priority_group("test2", vec![id.clone()].into_iter().collect()); + assert_eq!(peers_state.num_in, 0); + peers_state.set_priority_group("test1", vec![].into_iter().collect()); + assert_eq!(peers_state.num_in, 0); + peers_state.set_priority_group("test2", vec![].into_iter().collect()); + assert_eq!(peers_state.num_in, 1); } } -- GitLab