Skip to content
Snippets Groups Projects
Commit 3ba7442d authored by Pierre Krieger's avatar Pierre Krieger Committed by Arkadiy Paronyan
Browse files

Properly distinguish in/out connections in the PSM (#2102)

parent 418edcfb
Branches
No related merge requests found
......@@ -31,14 +31,18 @@ pub struct Peerset {
struct Inner {
/// List of nodes that we know exist but we are not connected to.
/// Elements in this list must never be in `out_slots` or `in_slots`.
discovered: Vec<PeerId>,
/// List of reserved nodes.
reserved: HashSet<PeerId>,
/// If true, we only accept reserved nodes.
reserved_only: bool,
/// Node slots. Each slot contains either `None` if the node is free, or `Some` if it is
/// assigned to a peer.
slots: Vec<Option<PeerId>>,
/// Node slots for outgoing connections. Each slot contains either `None` if the node is free,
/// or `Some` if it is assigned to a peer.
out_slots: Vec<Option<PeerId>>,
/// Node slots for incoming connections. Each slot contains either `None` if the node is free,
/// or `Some` if it is assigned to a peer.
in_slots: Vec<Option<PeerId>>,
}
/// Message that can be sent by the peer set manager (PSM).
......@@ -111,7 +115,8 @@ impl Peerset {
discovered: config.bootnodes.into_iter().collect(),
reserved: Default::default(),
reserved_only: config.reserved_only,
slots: (0 .. (config.in_peers + config.out_peers)).map(|_| None).collect(),
out_slots: (0 .. config.out_peers).map(|_| None).collect(),
in_slots: (0 .. config.in_peers).map(|_| None).collect(),
};
alloc_slots(&mut inner, &tx);
......@@ -147,10 +152,15 @@ impl Peerset {
return;
}
// Nothing more to do if we're already connected.
if inner.out_slots.iter().chain(inner.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) {
return;
}
// Assign a slot for this reserved peer.
if let Some(pos) = inner.slots.iter().position(|s| s.as_ref().map(|n| !inner.reserved.contains(n)).unwrap_or(true)) {
if let Some(pos) = inner.out_slots.iter().position(|s| s.as_ref().map(|n| !inner.reserved.contains(n)).unwrap_or(true)) {
let _ = self.tx.unbounded_send(Message::Connect(peer_id.clone()));
inner.slots[pos] = Some(peer_id);
inner.out_slots[pos] = Some(peer_id);
} else {
// All slots are filled with reserved peers.
......@@ -176,7 +186,7 @@ impl Peerset {
// Disconnect non-reserved nodes.
if reserved_only {
for slot in &mut inner.slots {
for slot in inner.out_slots.iter_mut().chain(inner.in_slots.iter_mut()) {
if let Some(peer) = slot.as_ref() {
if inner.reserved.contains(peer) {
continue;
......@@ -201,7 +211,7 @@ fn alloc_slots(inner: &mut Inner, tx: &mpsc::UnboundedSender<Message>) {
return;
}
for slot in inner.slots.iter_mut() {
for slot in inner.out_slots.iter_mut() {
if slot.is_some() {
continue;
}
......@@ -226,12 +236,12 @@ impl PeersetMut {
/// peerset is already connected to, in which case it must not answer.
pub fn incoming(&self, peer_id: PeerId, index: IncomingIndex) {
let mut inner = self.parent.inner.lock();
if inner.slots.iter().any(|s| s.as_ref() == Some(&peer_id)) {
if inner.out_slots.iter().chain(inner.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) {
return
}
if let Some(pos) = inner.slots.iter().position(|s| s.is_none()) {
inner.slots[pos] = Some(peer_id);
if let Some(pos) = inner.in_slots.iter().position(|s| s.is_none()) {
inner.in_slots[pos] = Some(peer_id);
let _ = self.parent.tx.unbounded_send(Message::Accept(index));
} else {
if inner.discovered.iter().all(|p| *p != peer_id) {
......@@ -247,6 +257,7 @@ impl PeersetMut {
/// `PeerId`, or accepted an incoming connection with this `PeerId`.
pub fn dropped(&self, peer_id: &PeerId) {
let mut inner = self.parent.inner.lock();
let inner = &mut *inner; // Fixes a borrowing issue.
// Automatically connect back if reserved.
if inner.reserved.contains(peer_id) {
......@@ -255,9 +266,10 @@ impl PeersetMut {
}
// Otherwise, free the slot.
for slot in inner.slots.iter_mut() {
for slot in inner.out_slots.iter_mut().chain(inner.in_slots.iter_mut()) {
if slot.as_ref() == Some(peer_id) {
*slot = None;
break;
}
}
......@@ -266,7 +278,7 @@ impl PeersetMut {
if inner.discovered.iter().all(|p| p != peer_id) {
inner.discovered.push(peer_id.clone());
}
alloc_slots(&mut inner, &self.parent.tx);
alloc_slots(inner, &self.parent.tx);
}
/// Adds a discovered peer id to the PSM.
......@@ -276,7 +288,7 @@ impl PeersetMut {
pub fn discovered(&self, peer_id: PeerId) {
let mut inner = self.parent.inner.lock();
if inner.slots.iter().any(|p| p.as_ref() == Some(&peer_id)) {
if inner.out_slots.iter().chain(inner.in_slots.iter()).any(|p| p.as_ref() == Some(&peer_id)) {
return;
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment