Skip to content
Snippets Groups Projects
Commit f008e069 authored by Aaro Altonen's avatar Aaro Altonen Committed by GitHub
Browse files

Accept only `--in-peers` many inbound full nodes in `SyncingEngine` (#14603)

* Accept only `--in-peers` many inbound full nodes in `SyncingEngine`

Due to full and light nodes being stored in the same set, it's possible
that `SyncingEngine` accepts more than `--in-peers` many inbound full
nodes which leaves some of its outbound slots unoccupied.

`ProtocolController` still tries to occupy these slots by opening
outbound substreams. As these substreams are accepted by the remote peer,
the connection is relayed to `SyncingEngine` which rejects the node
because it's already full. This in turn results in the substream being
inactive and the peer getting evicted.

Fixing this properly would require relocating the light peer slot
allocation away from `ProtocolController` or alternatively moving entire
the substream validation there, both of which are epic refactorings and
not necessarily in line with other goals. As a temporary measure, verify
in `SyncingEngine` that it doesn't accept more than the specified amount
of inbound full peers.

* Fix tests

* Apply review comments
parent 5b268d44
No related merge requests found
......@@ -106,6 +106,8 @@ pub enum SyncEvent<B: BlockT> {
received_handshake: BlockAnnouncesHandshake<B>,
/// Notification sink.
sink: NotificationsSink,
/// Is the connection inbound.
inbound: bool,
/// Channel for reporting accept/reject of the substream.
tx: oneshot::Sender<bool>,
},
......
......@@ -451,6 +451,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
received_handshake,
notifications_sink,
negotiated_fallback,
inbound,
} => {
// Set number 0 is hardcoded the default set of peers we sync from.
if set_id == HARDCODED_PEERSETS_SYNC {
......@@ -470,6 +471,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
inbound,
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
......@@ -510,6 +512,7 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
inbound,
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
......
......@@ -319,6 +319,8 @@ pub enum NotificationsOut {
received_handshake: Vec<u8>,
/// Object that permits sending notifications to the peer.
notifications_sink: NotificationsSink,
/// Is the connection inbound.
inbound: bool,
},
/// The [`NotificationsSink`] object used to send notifications with the given peer must be
......@@ -1810,6 +1812,7 @@ impl NetworkBehaviour for Notifications {
negotiated_fallback,
received_handshake,
notifications_sink,
inbound,
..
} => {
let set_id = crate::peerset::SetId::from(protocol_index);
......@@ -1834,6 +1837,7 @@ impl NetworkBehaviour for Notifications {
let event = NotificationsOut::CustomProtocolOpen {
peer_id,
set_id,
inbound,
negotiated_fallback,
received_handshake,
notifications_sink: notifications_sink.clone(),
......
......@@ -203,6 +203,8 @@ enum State {
Opening {
/// Substream opened by the remote. If `Some`, has been accepted.
in_substream: Option<NotificationsInSubstream<NegotiatedSubstream>>,
/// Is the connection inbound.
inbound: bool,
},
/// Protocol is in the "Open" state.
......@@ -276,6 +278,8 @@ pub enum NotifsHandlerOut {
received_handshake: Vec<u8>,
/// How notifications can be sent to this node.
notifications_sink: NotificationsSink,
/// Is the connection inbound.
inbound: bool,
},
/// Acknowledges a [`NotifsHandlerIn::Open`]. The remote has refused the attempt to open
......@@ -518,7 +522,7 @@ impl ConnectionHandler for NotifsHandler {
error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler");
debug_assert!(false);
},
State::Opening { ref mut in_substream } => {
State::Opening { ref mut in_substream, inbound } => {
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink {
......@@ -543,6 +547,7 @@ impl ConnectionHandler for NotifsHandler {
endpoint: self.endpoint.clone(),
received_handshake: new_open.handshake,
notifications_sink,
inbound,
},
));
},
......@@ -597,7 +602,7 @@ impl ConnectionHandler for NotifsHandler {
);
}
protocol_info.state = State::Opening { in_substream: None };
protocol_info.state = State::Opening { in_substream: None, inbound: false };
},
State::OpenDesiredByRemote { pending_opening, in_substream } => {
let handshake_message = protocol_info.config.handshake.read().clone();
......@@ -623,12 +628,13 @@ impl ConnectionHandler for NotifsHandler {
// The state change is done in two steps because of borrowing issues.
let in_substream = match mem::replace(
&mut protocol_info.state,
State::Opening { in_substream: None },
State::Opening { in_substream: None, inbound: false },
) {
State::OpenDesiredByRemote { in_substream, .. } => in_substream,
_ => unreachable!(),
};
protocol_info.state = State::Opening { in_substream: Some(in_substream) };
protocol_info.state =
State::Opening { in_substream: Some(in_substream), inbound: true };
},
State::Opening { .. } | State::Open { .. } => {
// As documented, it is forbidden to send an `Open` while there is already
......@@ -772,7 +778,7 @@ impl ConnectionHandler for NotifsHandler {
match &mut self.protocols[protocol_index].state {
State::Closed { .. } |
State::Open { in_substream: None, .. } |
State::Opening { in_substream: None } => {},
State::Opening { in_substream: None, .. } => {},
State::Open { in_substream: in_substream @ Some(_), .. } =>
match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) {
......@@ -893,6 +899,7 @@ pub mod tests {
endpoint,
received_handshake,
notifications_sink,
inbound: false,
}
}
......@@ -1131,7 +1138,7 @@ pub mod tests {
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_) }
State::Opening { in_substream: Some(_), .. }
));
// remote now tries to open another substream, verify that it is rejected and closed
......@@ -1168,7 +1175,7 @@ pub mod tests {
.await;
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_) }
State::Opening { in_substream: Some(_), .. }
));
}
......@@ -1204,7 +1211,7 @@ pub mod tests {
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_) }
State::Opening { in_substream: Some(_), .. }
));
// accept the substream and move its state to `Open`
......@@ -1295,7 +1302,7 @@ pub mod tests {
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_) }
State::Opening { in_substream: Some(_), .. }
));
handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
......@@ -1355,7 +1362,7 @@ pub mod tests {
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_) }
State::Opening { in_substream: Some(_), .. }
));
handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
......@@ -1438,7 +1445,7 @@ pub mod tests {
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_) }
State::Opening { in_substream: Some(_), .. }
));
handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
......@@ -1487,7 +1494,7 @@ pub mod tests {
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_) }
State::Opening { in_substream: Some(_), .. }
));
handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
......
......@@ -178,6 +178,8 @@ pub struct Peer<B: BlockT> {
last_notification_sent: Instant,
/// Instant when the last notification was received from peer.
last_notification_received: Instant,
/// Is the peer inbound.
inbound: bool,
}
pub struct SyncingEngine<B: BlockT, Client> {
......@@ -238,6 +240,12 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Number of slots to allocate to light nodes.
default_peers_set_num_light: usize,
/// Maximum number of inbound peers.
max_in_peers: usize,
/// Number of inbound peers accepted so far.
num_in_peers: usize,
/// A cache for the data that was associated to a block announcement.
block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
......@@ -370,6 +378,12 @@ where
.flatten()
.expect("Genesis block exists; qed");
// `default_peers_set.in_peers` contains an unspecified amount of light peers so the number
// of full inbound peers must be calculated from the total full peer count
let max_full_peers = net_config.network_config.default_peers_set_num_full;
let max_out_peers = net_config.network_config.default_peers_set.out_peers;
let max_in_peers = (max_full_peers - max_out_peers) as usize;
Ok((
Self {
roles,
......@@ -391,6 +405,8 @@ where
default_peers_set_no_slot_peers,
default_peers_set_num_full,
default_peers_set_num_light,
num_in_peers: 0usize,
max_in_peers,
event_streams: Vec::new(),
tick_timeout: Delay::new(TICK_TIMEOUT),
syncing_started: None,
......@@ -718,8 +734,9 @@ where
remote,
received_handshake,
sink,
inbound,
tx,
} => match self.on_sync_peer_connected(remote, &received_handshake, sink) {
} => match self.on_sync_peer_connected(remote, &received_handshake, sink, inbound) {
Ok(()) => {
let _ = tx.send(true);
},
......@@ -788,15 +805,31 @@ where
///
/// Returns a result if the handshake of this peer was indeed accepted.
pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> {
if self.peers.remove(&peer).is_some() {
if let Some(info) = self.peers.remove(&peer) {
if self.important_peers.contains(&peer) {
log::warn!(target: "sync", "Reserved peer {} disconnected", peer);
} else {
log::debug!(target: "sync", "{} disconnected", peer);
}
if !self.default_peers_set_no_slot_connected_peers.remove(&peer) &&
info.inbound && info.info.roles.is_full()
{
match self.num_in_peers.checked_sub(1) {
Some(value) => {
self.num_in_peers = value;
},
None => {
log::error!(
target: "sync",
"trying to disconnect an inbound node which is not counted as inbound"
);
debug_assert!(false);
},
}
}
self.chain_sync.peer_disconnected(&peer);
self.default_peers_set_no_slot_connected_peers.remove(&peer);
self.event_streams
.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer)).is_ok());
Ok(())
......@@ -815,6 +848,7 @@ where
who: PeerId,
status: &BlockAnnouncesHandshake<B>,
sink: NotificationsSink,
inbound: bool,
) -> Result<(), ()> {
log::trace!(target: "sync", "New peer {} {:?}", who, status);
......@@ -857,6 +891,15 @@ where
let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&who);
let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
// make sure to accept no more than `--in-peers` many full nodes
if !no_slot_peer &&
status.roles.is_full() &&
inbound && self.num_in_peers == self.max_in_peers
{
log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {who}");
return Err(())
}
if status.roles.is_full() &&
self.chain_sync.num_peers() >=
self.default_peers_set_num_full +
......@@ -887,6 +930,7 @@ where
sink,
last_notification_sent: Instant::now(),
last_notification_received: Instant::now(),
inbound,
};
let req = if peer.info.roles.is_full() {
......@@ -904,8 +948,11 @@ where
log::debug!(target: "sync", "Connected {}", who);
self.peers.insert(who, peer);
if no_slot_peer {
self.default_peers_set_no_slot_connected_peers.insert(who);
} else if inbound && status.roles.is_full() {
self.num_in_peers += 1;
}
if let Some(req) = req {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment